Re: Reading too many files

2022-10-03 Thread Sid
Are you trying to run on cloud ?

On Mon, 3 Oct 2022, 21:55 Sachit Murarka,  wrote:

> Hello,
>
> I am reading too many files in Spark 3.2(Parquet) . It is not giving any
> error in the logs. But after spark.read.parquet , it is not able to proceed
> further.
> Can anyone please suggest if there is any property to improve the parallel
> reads? I am reading more than 25000 files .
>
> Kind Regards,
> Sachit Murarka
>


Re: Splittable or not?

2022-09-19 Thread Sid
Cool. Thanks, everyone for the reply.

On Sat, Sep 17, 2022 at 9:50 PM Enrico Minack 
wrote:

> If with "won't affect the performance" you mean "parquet is splittable
> though it uses snappy", then yes. Splittable files allow for optimal
> parallelization, which "won't affect performance".
>
> Spark writing data will split the data into multiple files already (here
> parquet files). Even if each file would not be splittable, your data have
> been split already. Splittable parquet files allow for more granularity
> (more splitting if your data), in case those files are big.
>
> Enrico
>
>
> Am 14.09.22 um 21:57 schrieb Sid:
>
> Okay so you mean to say that parquet compresses the denormalized data
> using snappy so it won't affect the performance.
>
> Only using snappy will affect the performance
>
> Am I correct?
>
> On Thu, 15 Sep 2022, 01:08 Amit Joshi,  wrote:
>
>> Hi Sid,
>>
>> Snappy itself is not splittable. But the format that contains the actual
>> data like parquet (which are basically divided into row groups) can be
>> compressed using snappy.
>> This works because blocks(pages of parquet format) inside the parquet can
>> be independently compressed using snappy.
>>
>> Thanks
>> Amit
>>
>> On Wed, Sep 14, 2022 at 8:14 PM Sid  wrote:
>>
>>> Hello experts,
>>>
>>> I know that Gzip and snappy files are not splittable i.e data won't be
>>> distributed into multiple blocks rather it would try to load the data in a
>>> single partition/block
>>>
>>> So, my question is when I write the parquet data via spark it gets
>>> stored at the destination with something like *part*.snappy.parquet*
>>>
>>> So, when I read this data will it affect my performance?
>>>
>>> Please help me if there is any understanding gap.
>>>
>>> Thanks,
>>> Sid
>>>
>>
>


Re: Splittable or not?

2022-09-14 Thread Sid
Okay so you mean to say that parquet compresses the denormalized data using
snappy so it won't affect the performance.

Only using snappy will affect the performance

Am I correct?

On Thu, 15 Sep 2022, 01:08 Amit Joshi,  wrote:

> Hi Sid,
>
> Snappy itself is not splittable. But the format that contains the actual
> data like parquet (which are basically divided into row groups) can be
> compressed using snappy.
> This works because blocks(pages of parquet format) inside the parquet can
> be independently compressed using snappy.
>
> Thanks
> Amit
>
> On Wed, Sep 14, 2022 at 8:14 PM Sid  wrote:
>
>> Hello experts,
>>
>> I know that Gzip and snappy files are not splittable i.e data won't be
>> distributed into multiple blocks rather it would try to load the data in a
>> single partition/block
>>
>> So, my question is when I write the parquet data via spark it gets stored
>> at the destination with something like *part*.snappy.parquet*
>>
>> So, when I read this data will it affect my performance?
>>
>> Please help me if there is any understanding gap.
>>
>> Thanks,
>> Sid
>>
>


Re: Long running task in spark

2022-09-14 Thread Sid
Try spark.driver.maxResultsSize =0

On Mon, 12 Sep 2022, 09:46 rajat kumar,  wrote:

> Hello Users,
>
> My 2 tasks are running forever. One of them gave a java heap space error.
> I have 10 Joins , all tables are big. I understand this is data skewness.
> Apart from changes at code level , any property which can be used in Spark
> Config?
>
>
> I am using Spark2 hence AQE can not be used.
>
>
> Thanks
> Rajat
>


Splittable or not?

2022-09-14 Thread Sid
Hello experts,

I know that Gzip and snappy files are not splittable i.e data won't be
distributed into multiple blocks rather it would try to load the data in a
single partition/block

So, my question is when I write the parquet data via spark it gets stored
at the destination with something like *part*.snappy.parquet*

So, when I read this data will it affect my performance?

Please help me if there is any understanding gap.

Thanks,
Sid


Joins internally

2022-08-11 Thread Sid
Hi Team,

Assume we have a large dataset and sort merge is by default join that spark
applies on this dataset.

Now, i want to understand internal working of joins.

How does this join work or any join work ?

Assume that data is already shuffled and sorted on the basis of keys.

So lets say that Table A has two Partitions A & B where data is hashed
based on hash value  and sorted within partitions

So my question is how does it comes to know that which partition from Table
A has to be joined or searched with which partition from Table B ?

TIA,
Sid


Re: Salting technique doubt

2022-08-03 Thread Sid
Hi Everyone,

Thanks a lot for your answers. It helped me a lot to clear the concept :)

Best,
Sid

On Mon, Aug 1, 2022 at 12:17 AM Vinod KC  wrote:

> Hi Sid,
> This example code with output will add some more clarity
>
> spark-shell --conf spark.sql.shuffle.partitions=3 --conf
>> spark.sql.autoBroadcastJoinThreshold=-1
>>
>>
>> scala> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.DataFrame
>>
>> scala> import org.apache.spark.sql.functions.{array, concat, explode,
>> floor, lit, rand}
>> import org.apache.spark.sql.functions.{array, concat, explode, floor,
>> lit, rand}
>>
>>
>>
>> scala>  import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala>  val df1 = Seq(
>>  | ("x", "bc"),
>>  | ("x", "ce"),
>>  | ("x", "ab"),
>>  | ("x", "ef"),
>>  | ("x", "gh"),
>>  | ("y", "hk"),
>>  | ("z", "jk")
>>  |   ).toDF("t1c1","t1c2")
>> df1: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
>>
>> scala>  df1.show(10,false)
>> +++
>> |t1c1|t1c2|
>> +++
>> |x   |bc  |
>> |x   |ce  |
>> |x   |ab  |
>> |x   |ef  |
>> |x   |gh  |
>> |y   |hk  |
>> |z   |jk  |
>> +++
>>
>>
>> scala> val df2 = Seq(
>>  | ("x", "gkl"),
>>  | ("y", "nmb"),
>>  | ("z", "qwe")
>>  |   ).toDF("t2c1","t2c2")
>> df2: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string]
>>
>> scala>df2.show(10,false)
>> +++
>> |t2c1|t2c2|
>> +++
>> |x   |gkl |
>> |y   |nmb |
>> |z   |qwe |
>> +++
>>
>>
>> scala>
>>  |   def applySalt(leftTable: DataFrame, leftCol: String, rightTable:
>> DataFrame) = {
>>  |
>>  | var df1 = leftTable
>>  |   .withColumn(leftCol, concat(
>>  | leftTable.col(leftCol), lit("_"), lit(floor(rand(123456) *
>> 10
>>  | var df2 = rightTable
>>  |   .withColumn("explodedCol",
>>  | explode(
>>  |   array((0 to 10).map(lit(_)): _ *)
>>  | ))
>>  |
>>  | (df1, df2)
>>  |   }
>> applySalt: (leftTable: org.apache.spark.sql.DataFrame, leftCol: String,
>> rightTable: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame,
>> org.apache.spark.sql.DataFrame)
>>
>> scala>   val (df3, df4) = applySalt(df1, "t1c1", df2)
>> df3: org.apache.spark.sql.DataFrame = [t1c1: string, t1c2: string]
>> df4: org.apache.spark.sql.DataFrame = [t2c1: string, t2c2: string ... 1
>> more field]
>>
>> scala>
>>
>> scala>   df3.show(100, false)
>> +++
>> |t1c1|t1c2|
>> +++
>> |x_4 |bc  |
>> |x_8 |ce  |
>> |x_3 |ab  |
>> |x_0 |ef  |
>> |x_6 |gh  |
>> |y_9 |hk  |
>> |z_7 |jk  |
>> +++
>>
>>
>> scala>   df4.show(100, false)
>> +++---+
>> |t2c1|t2c2|explodedCol|
>> +++---+
>> |x   |gkl |0  |
>> |x   |gkl |1  |
>> |x   |gkl |2  |
>> |x   |gkl |3  |
>> |x   |gkl |4  |
>> |x   |gkl |5  |
>> |x   |gkl |6  |
>> |x   |gkl |7  |
>> |x   |gkl |8  |
>> |x   |gkl |9  |
>> |x   |gkl |10 |
>> |y   |nmb |0  |
>> |y   |nmb |1  |
>> |y   |nmb |2  |
>> |y   |nmb |3  |
>> |y   |nmb |4  |
>> |y   |nmb |5  |
>> |y   |nmb |6  |
>> |y   |nmb |7  |
>> |y   |nmb |8  |
>> |y   |nmb |9  |
>> |y   |nmb |10 |
>> |z   |qwe |0  |
>> |z   |qwe |1  |
>> |z   |qwe |2  |
>> |z   |qwe |3  |
>> |z   |qwe |4  |
>> |z   |qwe |5  |
>> |z   |qwe |6  |
>> |z   |qwe |7  |
>> |z   |qwe |8  |
>> |z   |qwe |9  |
>> |z   |qwe |10 |
>> +++---+
>>
>>
>> scala>   //join after elminating data skewness
>>
>> scala

Re: Salting technique doubt

2022-07-31 Thread Sid
Hi Amit,

Thanks for your reply. However, your answer doesn't seem different from
what I have explained.

My question is after salting if the keys are different like in my example
then post join there would be no results assuming the join type as inner
join because even though the keys are segregated in different partitions
based on unique keys they are not matching because x_1/x_2 !=x_8/x_9

How do you ensure that the results are matched?

Best,
Sid

On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi 
wrote:

> Hi Sid,
>
> Salting is normally a technique to add random characters to existing
> values.
> In big data we can use salting to deal with the skewness.
> Salting in join cas be used as :
> * Table A-*
> Col1, join_col , where join_col values are {x1, x2, x3}
> x1
> x1
> x1
> x2
> x2
> x3
>
> *Table B-*
> join_col, Col3 , where join_col  value are {x1, x2}
> x1
> x2
>
> *Problem: *Let say for table A, data is skewed on x1
> Now salting goes like this.  *Salt value =2*
> For
> *table A, *create a new col with values by salting join col
> *New_Join_Col*
> x1_1
> x1_2
> x1_1
> x2_1
> x2_2
> x3_1
>
> For *Table B,*
> Change the join_col to all possible values of the sale.
> join_col
> x1_1
> x1_2
> x2_1
> x2_2
>
> And then join it like
> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>
> Let me know if you have any questions.
>
> Regards
> Amit Joshi
>
>
> On Sat, Jul 30, 2022 at 7:16 PM Sid  wrote:
>
>> Hi Team,
>>
>> I was trying to understand the Salting technique for the column where
>> there would be a huge load on a single partition because of the same keys.
>>
>> I referred to one youtube video with the below understanding:
>>
>> So, using the salting technique we can actually change the joining column
>> values by appending some random number in a specified range.
>>
>> So, suppose I have these two values in a partition of two different
>> tables:
>>
>> Table A:
>> Partition1:
>> x
>> .
>> .
>> .
>> x
>>
>> Table B:
>> Partition1:
>> x
>> .
>> .
>> .
>> x
>>
>> After Salting it would be something like the below:
>>
>> Table A:
>> Partition1:
>> x_1
>>
>> Partition 2:
>> x_2
>>
>> Table B:
>> Partition1:
>> x_3
>>
>> Partition 2:
>> x_8
>>
>> Now, when I inner join these two tables after salting in order to avoid
>> data skewness problems, I won't get a match since the keys are different
>> after applying salting techniques.
>>
>> So how does this resolves the data skewness issue or if there is some
>> understanding gap?
>>
>> Could anyone help me in layman's terms?
>>
>> TIA,
>> Sid
>>
>


Salting technique doubt

2022-07-30 Thread Sid
Hi Team,

I was trying to understand the Salting technique for the column where there
would be a huge load on a single partition because of the same keys.

I referred to one youtube video with the below understanding:

So, using the salting technique we can actually change the joining column
values by appending some random number in a specified range.

So, suppose I have these two values in a partition of two different tables:

Table A:
Partition1:
x
.
.
.
x

Table B:
Partition1:
x
.
.
.
x

After Salting it would be something like the below:

Table A:
Partition1:
x_1

Partition 2:
x_2

Table B:
Partition1:
x_3

Partition 2:
x_8

Now, when I inner join these two tables after salting in order to avoid
data skewness problems, I won't get a match since the keys are different
after applying salting techniques.

So how does this resolves the data skewness issue or if there is some
understanding gap?

Could anyone help me in layman's terms?

TIA,
Sid


How use pattern matching in spark

2022-07-12 Thread Sid
Hi Team,

I have a dataset like the below one in .dat file:

13/07/2022abc
PWJ   PWJABC 513213217ABC GM20 05. 6/20/39
#01000count

Now I want to extract the header and tail records which I was able to do
it. Now, from the header, I need to extract the date and match it with the
current system date. Also, for the tail records, I need to match the number
of actual rows i.e 1 in my case with the values mentioned in the last row.
That is a kind of pattern matching so that I can find '1' in the last row
and say that the actual records and the value in the tail record matches
with each other.

How can I do this? Any links would be helpful. I think regex pattern
matching should help.

Also, I will be getting 3 formats for now i.e CSV, .DAT file and .TXT file.

So, as per me I could do validation for all these 3 file formats using
spark.read.text().rdd and performing intended operations on Rdds. Just the
validation part.

Therefore, wanted to understand is there any better way to achieve this?

Thanks,
Sid


Re: How reading works?

2022-07-12 Thread Sid
Yeah, I understood that now.

Thanks for the explanation, Bjorn.

Sid

On Wed, Jul 6, 2022 at 1:46 AM Bjørn Jørgensen 
wrote:

> Ehh.. What is "*duplicate column*" ? I don't think Spark supports that.
>
> duplicate column = duplicate rows
>
>
> tir. 5. jul. 2022 kl. 22:13 skrev Bjørn Jørgensen <
> bjornjorgen...@gmail.com>:
>
>> "*but I am getting the issue of the duplicate column which was present
>> in the old dataset.*"
>>
>> So you have answered your question!
>>
>> spark.read.option("multiline","true").json("path").filter(
>> col("edl_timestamp")>last_saved_timestamp) As you have figured out,
>> spark read all the json files in "path" then filter.
>>
>> There are some file formats that can have filters before reading files.
>> The one that I know about is Parquet. Like this link explains Spark:
>> Understand the Basic of Pushed Filter and Partition Filter Using Parquet
>> File
>> <https://medium.com/@songkunjump/spark-understand-the-basic-of-pushed-filter-and-partition-filter-using-parquet-file-3e5789e260bd>
>>
>>
>>
>>
>>
>> tir. 5. jul. 2022 kl. 21:21 skrev Sid :
>>
>>> Hi Team,
>>>
>>> I still need help in understanding how reading works exactly?
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> Can somebody help?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I already have a partitioned JSON dataset in s3 like the below:
>>>>>
>>>>> edl_timestamp=2022090800
>>>>>
>>>>> Now, the problem is, in the earlier 10 days of data collection there
>>>>> was a duplicate columns issue due to which we couldn't read the data.
>>>>>
>>>>> Now the latest 10 days of data are proper. So, I am trying to do
>>>>> something like the below:
>>>>>
>>>>>
>>>>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>>>>
>>>>> but I am getting the issue of the duplicate column which was present
>>>>> in the old dataset. So, I am trying to understand how the spark reads the
>>>>> data. Does it full dataset and filter on the basis of the last saved
>>>>> timestamp or does it filter only what is required? If the second case is
>>>>> true, then it should have read the data since the latest data is correct.
>>>>>
>>>>> So just trying to understand. Could anyone help here?
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>>
>>>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: How reading works?

2022-07-05 Thread Sid
Hi Team,

I still need help in understanding how reading works exactly?

Thanks,
Sid

On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:

> Hi Team,
>
> Can somebody help?
>
> Thanks,
> Sid
>
> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>
>> Hi,
>>
>> I already have a partitioned JSON dataset in s3 like the below:
>>
>> edl_timestamp=2022090800
>>
>> Now, the problem is, in the earlier 10 days of data collection there was
>> a duplicate columns issue due to which we couldn't read the data.
>>
>> Now the latest 10 days of data are proper. So, I am trying to do
>> something like the below:
>>
>>
>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>
>> but I am getting the issue of the duplicate column which was present in
>> the old dataset. So, I am trying to understand how the spark reads the
>> data. Does it full dataset and filter on the basis of the last saved
>> timestamp or does it filter only what is required? If the second case is
>> true, then it should have read the data since the latest data is correct.
>>
>> So just trying to understand. Could anyone help here?
>>
>> Thanks,
>> Sid
>>
>>
>>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sid
So as per the discussion, shuffle stages output is also stored on disk and
not in memory?

On Sat, Jul 2, 2022 at 8:44 PM krexos  wrote:

>
> thanks a lot!
>
> --- Original Message ---
> On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen 
> wrote:
>
> I think that is more accurate yes. Though, shuffle files are local, not on
> distributed storage too, which is an advantage. MR also had map only
> transforms and chained mappers, but harder to use. Not impossible but you
> could also say Spark just made it easier to do the more efficient thing.
>
> On Sat, Jul 2, 2022, 9:34 AM krexos  wrote:
>
>>
>> You said Spark performs IO only when reading data and writing final data
>> to the disk. I though by that you meant that it only reads the input files
>> of the job and writes the output of the whole job to the disk, but in
>> reality spark does store intermediate results on disk, just in less places
>> than MR
>>
>> --- Original Message ---
>> On Saturday, July 2nd, 2022 at 5:27 PM, Sid 
>> wrote:
>>
>> I have explained the same thing in a very layman's terms. Go through it
>> once.
>>
>> On Sat, 2 Jul 2022, 19:45 krexos,  wrote:
>>
>>>
>>> I think I understand where Spark saves IO.
>>>
>>> in MR we have map -> reduce -> map -> reduce -> map -> reduce ...
>>>
>>> which writes results do disk at the end of each such "arrow",
>>>
>>> on the other hand in spark we have
>>>
>>> map -> reduce + map -> reduce + map -> reduce ...
>>>
>>> which saves about 2 times the IO
>>>
>>> thanks everyone,
>>> krexos
>>>
>>> --- Original Message ---
>>> On Saturday, July 2nd, 2022 at 1:35 PM, krexos 
>>> wrote:
>>>
>>> Hello,
>>>
>>> One of the main "selling points" of Spark is that unlike Hadoop
>>> map-reduce that persists intermediate results of its computation to HDFS
>>> (disk), Spark keeps all its results in memory. I don't understand this as
>>> in reality when a Spark stage finishes it writes all of the data into
>>> shuffle files stored on the disk
>>> <https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md>.
>>> How then is this an improvement on map-reduce?
>>>
>>> Image from https://youtu.be/7ooZ4S7Ay6Y
>>>
>>>
>>> thanks!
>>>
>>>
>>>
>>
>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sid
I have explained the same thing in a very layman's terms. Go through it
once.

On Sat, 2 Jul 2022, 19:45 krexos,  wrote:

>
> I think I understand where Spark saves IO.
>
> in MR we have map -> reduce -> map  -> reduce -> map -> reduce ...
>
> which writes results do disk at the end of each such "arrow",
>
> on the other hand in spark we have
>
> map -> reduce + map -> reduce + map -> reduce ...
>
> which saves about 2 times the IO
>
> thanks everyone,
> krexos
>
> --- Original Message ---
> On Saturday, July 2nd, 2022 at 1:35 PM, krexos 
> wrote:
>
> Hello,
>
> One of the main "selling points" of Spark is that unlike Hadoop map-reduce
> that persists intermediate results of its computation to HDFS (disk), Spark
> keeps all its results in memory. I don't understand this as in reality when
> a Spark stage finishes it writes all of the data into shuffle files
> stored on the disk
> .
> How then is this an improvement on map-reduce?
>
> Image from https://youtu.be/7ooZ4S7Ay6Y
>
>
> thanks!
>
>
>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sid
Hi Krexos,

If I understand correctly, you are trying to ask that even spark involves
disk i/o then how it is an advantage over map reduce.

Basically, Map Reduce phase writes every intermediate results to the disk.
So on an average it involves 6 times disk I/O whereas spark(assuming it has
an enough memory to store intermediate results) on an average involves 3
times less disk I/O i.e only while reading the data and writing the final
data to the disk.

Thanks,
Sid

On Sat, 2 Jul 2022, 17:58 krexos,  wrote:

> Hello,
>
> One of the main "selling points" of Spark is that unlike Hadoop map-reduce
> that persists intermediate results of its computation to HDFS (disk), Spark
> keeps all its results in memory. I don't understand this as in reality when
> a Spark stage finishes it writes all of the data into shuffle files
> stored on the disk
> <https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md>.
> How then is this an improvement on map-reduce?
>
> Image from https://youtu.be/7ooZ4S7Ay6Y
>
>
> thanks!
>


Re: Glue is serverless? how?

2022-06-28 Thread Sid
Thanks for your insights everyone :)

Much appreciated.

On Tue, Jun 28, 2022 at 6:27 PM finkel  wrote:

> The cakes is a lie ©
> Actually serverless things run on servers. Vendors are just trying to
> create sun an API that will abstract you from hardware and even
> container where your software runs.
>
> Of course, it's just a marketing from some point of view, but sometimes
> it acually can bring some bonuses to your development. Like the border
> between Dev and Ops disappears completely.
>
> On 22/06/26 01:04PM, Bjørn Jørgensen wrote:
> > https://en.m.wikipedia.org/wiki/Serverless_computing
> >
> >
> >
> > søn. 26. jun. 2022, 10:26 skrev Sid :
> >
> > > Hi Team,
> > >
> > > I am developing a spark job in glue and have read that glue is
> serverless.
> > > I know that using glue studio we can autoscale the workers. However, I
> want
> > > to understand how it is serverless?
> > >
> > > We specify the number of workers in the configuration for our job. Then
> > > what is that which is managed by Glue and we don't have to worry about
> the
> > > underlying infrastructure?
> > >
> > > Please help me to understand in layman's terms.
> > >
> > > Thanks,
> > > Sid
> > >
>
> --
> Regards,
> Pasha
>
> Big Data Tools @ JetBrains
>


Understanding about joins in spark

2022-06-27 Thread Sid
Hi Team,

As per my understanding, assume it to be a large dataset. When we apply
joins, data from different executors are shuffled in such a way that the
same "keys" are landed in one partition.

So, this is done for both the dataframes, right?  For eg: Key A for df1
will be sorted and kept in one partition and Key A for df2 will be sorted
and kept in another partition and then it will be compared and merged?

I know that for shuffle hash join keys for both data frames are merged
under a single partition since the smaller data is copied on each and every
executor.

Also, where would be the join operation performed? on another worker node
or it is performed on the driver side?

Somebody, please help me to understand this by correcting me w.r.t my
points or just adding an explanation to it.

TIA,
Sid


Glue is serverless? how?

2022-06-26 Thread Sid
Hi Team,

I am developing a spark job in glue and have read that glue is serverless.
I know that using glue studio we can autoscale the workers. However, I want
to understand how it is serverless?

We specify the number of workers in the configuration for our job. Then
what is that which is managed by Glue and we don't have to worry about the
underlying infrastructure?

Please help me to understand in layman's terms.

Thanks,
Sid


Re: Spark Doubts

2022-06-25 Thread Sid
Hi Tufan,

Thanks for the answers. However, by the second point, I mean to say where
would my code reside? Will it be copied to all the executors since the code
size would be small or will it be maintained on the driver's side? I know
that driver converts the code to DAG and when an action is called it is
submitted to the DAG scheduler and so on...

Thanks,
Sid

On Sat, Jun 25, 2022 at 12:34 PM Tufan Rakshit  wrote:

> Please find the answers inline please .
> 1) Can I apply predicate pushdown filters if I have data stored in S3 or
> it should be used only while reading from DBs?
> it can be applied in s3 if you store parquet , csv, json or in avro format
> .It does not depend on the DB , its supported in object store like s3 as
> well .
>
> 2) While running the data in distributed form, is my code copied to each
> and every executor. As per me, it should be the case since code.zip would
> be smaller in size to be copied on each worker node.
> if  you are trying to join two datasets out of which one is small , Spark
> by default would try to broadcast the smaller data set to the other
> executor , rather going for a Sort merge Join , There is property which is
> enabled by default from spark 3.1 , the limit for smaller dataframe to be
> broadcasted is 10 MB , it can also be changed  to higher value with config .
>
> 3) Also my understanding of shuffling of data is " It is moving one
> partition to another partition or moving data(keys) of one partition to
> another partition of those keys. It increases memory since before shuffling
> it copies the data in the memory and then transfers to another partition".
> Is it correct? If not, please correct me.
>
> It depends on the context of Distributed computing as Your data does not
> sit in one machine , neither in one Disk . Shuffle is involved when you try
> to trigger actions like Group by or Sort as it involves bringing all the
> keys into one executor Do the computation , or when Sort merge Join is
> triggered then both the dataset Sorted and this sort is Global sort not
> partition wise sort . yes its memory intensive operation as , if you see a
> lot of shuffle to be involved best to use SSD (M5d based machine in AWS ) .
> As for really big jobs where TB worth of data has to be joined its not
> possible to do all the operation in memory in RAM
>
>
> Hope that helps .
>
> Best
> Tufan
>
>
>
> On Sat, 25 Jun 2022 at 08:43, Sid  wrote:
>
>> Hi Team,
>>
>> I have various doubts as below:
>>
>> 1) Can I apply predicate pushdown filters if I have data stored in S3 or
>> it should be used only while reading from DBs?
>>
>> 2) While running the data in distributed form, is my code copied to each
>> and every executor. As per me, it should be the case since code.zip would
>> be smaller in size to be copied on each worker node.
>>
>> 3) Also my understanding of shuffling of data is " It is moving one
>> partition to another partition or moving data(keys) of one partition to
>> another partition of those keys. It increases memory since before shuffling
>> it copies the data in the memory and then transfers to another partition".
>> Is it correct? If not, please correct me.
>>
>> Please help me to understand these things in layman's terms if my
>> assumptions are not correct.
>>
>> Thanks,
>> Sid
>>
>


Spark Doubts

2022-06-24 Thread Sid
Hi Team,

I have various doubts as below:

1) Can I apply predicate pushdown filters if I have data stored in S3 or it
should be used only while reading from DBs?

2) While running the data in distributed form, is my code copied to each
and every executor. As per me, it should be the case since code.zip would
be smaller in size to be copied on each worker node.

3) Also my understanding of shuffling of data is " It is moving one
partition to another partition or moving data(keys) of one partition to
another partition of those keys. It increases memory since before shuffling
it copies the data in the memory and then transfers to another partition".
Is it correct? If not, please correct me.

Please help me to understand these things in layman's terms if my
assumptions are not correct.

Thanks,
Sid


Re: Need help with the configuration for AWS glue jobs

2022-06-23 Thread Sid
Where can I find information on the size of the datasets supported by AWS
Glue? I didn't see it on the documentation

Also, if I want to process TBs of data for eg 1TB what should be the ideal
EMR cluster configuration?

Could you please guide me on this?

Thanks,
Sid.


On Thu, 23 Jun 2022, 23:44 Gourav Sengupta, 
wrote:

> Please use EMR, Glue is not made for heavy processing jobs.
>
> On Thu, Jun 23, 2022 at 6:36 AM Sid  wrote:
>
>> Hi Team,
>>
>> Could anyone help me in the below problem:
>>
>>
>> https://stackoverflow.com/questions/72724999/how-to-calculate-number-of-g-1-workers-in-aws-glue-for-processing-1tb-data
>>
>> Thanks,
>> Sid
>>
>


Need help with the configuration for AWS glue jobs

2022-06-22 Thread Sid
Hi Team,

Could anyone help me in the below problem:

https://stackoverflow.com/questions/72724999/how-to-calculate-number-of-g-1-workers-in-aws-glue-for-processing-1tb-data

Thanks,
Sid


Re: Will it lead to OOM error?

2022-06-22 Thread Sid
Thanks all for your answers. Much appreciated.

On Thu, Jun 23, 2022 at 6:07 AM Yong Walt  wrote:

> We have many cases like this. it won't cause OOM.
>
> Thanks
>
> On Wed, Jun 22, 2022 at 8:28 PM Sid  wrote:
>
>> I have a 150TB CSV file.
>>
>> I have a total of 100 TB RAM and 100TB disk. So If I do something like
>> this
>>
>> spark.read.option("header","true").csv(filepath).show(false)
>>
>> Will it lead to an OOM error since it doesn't have enough memory? or it
>> will spill data onto the disk and process it?
>>
>> Thanks,
>> Sid
>>
>


Re: Will it lead to OOM error?

2022-06-22 Thread Sid
Hi Enrico,

Thanks for the insights.

Could you please help me to understand with one example of compressed files
where the file wouldn't be split in partitions and will put load on a
single partition and might lead to OOM error?

Thanks,
Sid

On Wed, Jun 22, 2022 at 6:40 PM Enrico Minack 
wrote:

> The RAM and disk memory consumtion depends on what you do with the data
> after reading them.
>
> Your particular action will read 20 lines from the first partition and
> show them. So it will not use any RAM or disk, no matter how large the CSV
> is.
>
> If you do a count instead of show, it will iterate over the each partition
> and return a count per partition, so no RAM here needed as well.
>
> If you do some real processing of the data, the requirement RAM and disk
> again depends on involved shuffles and intermediate results that need to be
> store in RAM or on disk.
>
> Enrico
>
>
> Am 22.06.22 um 14:54 schrieb Deepak Sharma:
>
> It will spill to disk if everything can’t be loaded in memory .
>
>
> On Wed, 22 Jun 2022 at 5:58 PM, Sid  wrote:
>
>> I have a 150TB CSV file.
>>
>> I have a total of 100 TB RAM and 100TB disk. So If I do something like
>> this
>>
>> spark.read.option("header","true").csv(filepath).show(false)
>>
>> Will it lead to an OOM error since it doesn't have enough memory? or it
>> will spill data onto the disk and process it?
>>
>> Thanks,
>> Sid
>>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>


Will it lead to OOM error?

2022-06-22 Thread Sid
I have a 150TB CSV file.

I have a total of 100 TB RAM and 100TB disk. So If I do something like this

spark.read.option("header","true").csv(filepath).show(false)

Will it lead to an OOM error since it doesn't have enough memory? or it
will spill data onto the disk and process it?

Thanks,
Sid


Re: Spark Doubts

2022-06-21 Thread Sid
Hi,

Thanks for your answers. Much appreciated

I know that we can cache the data frame in memory or disk but I want to
understand when the data frame is loaded initially and where does it reside
by default?


Thanks,
Sid

On Wed, Jun 22, 2022 at 6:10 AM Yong Walt  wrote:

> These are the basic concepts in spark :)
> You may take a bit time to read this small book:
> https://cloudcache.net/resume/PDDWS2-V2.pdf
>
> regards
>
>
> On Wed, Jun 22, 2022 at 3:17 AM Sid  wrote:
>
>> Hi Team,
>>
>> I have a few doubts about the below questions:
>>
>> 1) data frame will reside where? memory? disk? memory allocation about
>> data frame?
>> 2) How do you configure each partition?
>> 3) Is there any way to calculate the exact partitions needed to load a
>> specific file?
>>
>> Thanks,
>> Sid
>>
>


Spark Doubts

2022-06-21 Thread Sid
Hi Team,

I have a few doubts about the below questions:

1) data frame will reside where? memory? disk? memory allocation about data
frame?
2) How do you configure each partition?
3) Is there any way to calculate the exact partitions needed to load a
specific file?

Thanks,
Sid


Re: How reading works?

2022-06-20 Thread Sid
Hi Team,

Can somebody help?

Thanks,
Sid

On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:

> Hi,
>
> I already have a partitioned JSON dataset in s3 like the below:
>
> edl_timestamp=2022090800
>
> Now, the problem is, in the earlier 10 days of data collection there was a
> duplicate columns issue due to which we couldn't read the data.
>
> Now the latest 10 days of data are proper. So, I am trying to do
> something like the below:
>
>
> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>
> but I am getting the issue of the duplicate column which was present in
> the old dataset. So, I am trying to understand how the spark reads the
> data. Does it full dataset and filter on the basis of the last saved
> timestamp or does it filter only what is required? If the second case is
> true, then it should have read the data since the latest data is correct.
>
> So just trying to understand. Could anyone help here?
>
> Thanks,
> Sid
>
>
>


How reading works?

2022-06-19 Thread Sid
Hi,

I already have a partitioned JSON dataset in s3 like the below:

edl_timestamp=2022090800

Now, the problem is, in the earlier 10 days of data collection there was a
duplicate columns issue due to which we couldn't read the data.

Now the latest 10 days of data are proper. So, I am trying to do
something like the below:

spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)

but I am getting the issue of the duplicate column which was present in the
old dataset. So, I am trying to understand how the spark reads the data.
Does it full dataset and filter on the basis of the last saved timestamp or
does it filter only what is required? If the second case is true, then it
should have read the data since the latest data is correct.

So just trying to understand. Could anyone help here?

Thanks,
Sid


Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
What do you mean by overkill here?

I tried the below way to iterate over 4k records under a while loop.
However, it runs for the only first record. What could be wrong here? I am
going through few SO blogs where user found the below approach faster than
withColumn approach :

finalDF = finalDF.select("meta").rdd.map(
lambda x: call_to_cust_bulk_api(policyUrl, x[0])).toDF()


On Mon, Jun 13, 2022 at 4:13 PM Gourav Sengupta 
wrote:

> Hi,
>
> >> spark.range(1).createOrReplaceTempView("test")
> >> maximum_records_per_api_call = 40
> >> batch_count = spark.sql("SELECT * FROM test").count() /
> maximum_records_per_api_call
> >> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
> batch_id FROM
> test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")
>
>
> the above code should be able to then be run with a udf as long as we are
> able to control the parallelism with the help of executor count and task
> cpi configuration.
>
> But once again, this is just an unnecessary overkill.
>
>
> Regards,
> Gourav Sengupta
>
> On Mon, Jun 13, 2022 at 10:41 AM Sid  wrote:
>
>> Hi Gourav,
>>
>> Could you please provide me with some examples?
>>
>> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> try to use mod of a monotonically increasing field and then use
>>> repartitionbyrange function, and see whether SPARK automatically serialises
>>> it based on the number of executors that you put in the job.
>>>
>>> But once again, this is kind of an overkill, for fetching data from a
>>> API, creating a simple python program works quite well.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>>>
>>>> Hi Gourav,
>>>>
>>>> Do you have any examples or links, please? That would help me to
>>>> understand.
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I think that serialising data using spark is an overkill, why not use
>>>>> normal python.
>>>>>
>>>>> Also have you tried repartition by range, that way you can use modulus
>>>>> operator to batch things up?
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>>
>>>>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am trying to hit the POST APIs for the very first time using
>>>>>> Pyspark.
>>>>>>
>>>>>> My end goal is to achieve is something like the below:
>>>>>>
>>>>>>
>>>>>>1.  Generate the data
>>>>>>2. Send the data in the batch of 4k records in one batch since
>>>>>>the API can accept the 4k records at once.
>>>>>>3. The record would be as the below:
>>>>>>4.
>>>>>>
>>>>>>{
>>>>>>"Token": "",
>>>>>>"CustomerName": "",
>>>>>>"Object": "",
>>>>>>"Data": [{"A":"1"},{"A":"2"}]
>>>>>>}
>>>>>>
>>>>>>5. Token will be generated first then it would be passed to the
>>>>>>'Token' key in the data.
>>>>>>
>>>>>> For the above goal, I initially wrote something like the below which
>>>>>> gives a heap error because the data frame is getting created on the 
>>>>>> driver
>>>>>> side, and the size of the records is a minimum of 1M.
>>>>>>df = modifiedData # Assume it to be query results stored
>>>>>> as a DF
>>>>>>
>>>>>> df = df.withColumn("uniqueID", lit("1"))
>>>>>>
>>>>>> df = df.withColumn("row_num", row_number().over(
>>>>>>
>>>>>> Window.partitionBy(col("uniqueID")).orderBy(col("

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Gourav,

Could you please provide me with some examples?

On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta 
wrote:

> Hi,
>
> try to use mod of a monotonically increasing field and then use
> repartitionbyrange function, and see whether SPARK automatically serialises
> it based on the number of executors that you put in the job.
>
> But once again, this is kind of an overkill, for fetching data from a API,
> creating a simple python program works quite well.
>
>
> Regards,
> Gourav
>
> On Mon, Jun 13, 2022 at 9:28 AM Sid  wrote:
>
>> Hi Gourav,
>>
>> Do you have any examples or links, please? That would help me to
>> understand.
>>
>> Thanks,
>> Sid
>>
>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>> I think that serialising data using spark is an overkill, why not use
>>> normal python.
>>>
>>> Also have you tried repartition by range, that way you can use modulus
>>> operator to batch things up?
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>>
>>>> My end goal is to achieve is something like the below:
>>>>
>>>>
>>>>1.  Generate the data
>>>>2. Send the data in the batch of 4k records in one batch since the
>>>>API can accept the 4k records at once.
>>>>3. The record would be as the below:
>>>>4.
>>>>
>>>>{
>>>>"Token": "",
>>>>"CustomerName": "",
>>>>"Object": "",
>>>>"Data": [{"A":"1"},{"A":"2"}]
>>>>}
>>>>
>>>>5. Token will be generated first then it would be passed to the
>>>>'Token' key in the data.
>>>>
>>>> For the above goal, I initially wrote something like the below which
>>>> gives a heap error because the data frame is getting created on the driver
>>>> side, and the size of the records is a minimum of 1M.
>>>>df = modifiedData # Assume it to be query results stored as
>>>> a DF
>>>>
>>>> df = df.withColumn("uniqueID", lit("1"))
>>>>
>>>> df = df.withColumn("row_num", row_number().over(
>>>>
>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>> ))
>>>> tokenUrl = ""
>>>> # tokenUrl = ""
>>>> policyUrl = ""
>>>> tokenBody = {"Username": "", "Password": "",
>>>> "CustomerName": ""}
>>>>
>>>> def get_token(url, payload):
>>>> try:
>>>> print("Getting Token")
>>>> response = requests.request("POST", url,
>>>> data=payload)
>>>> data = response.json()
>>>> if data['ErrorDescription'] == 'Success':
>>>> token = data['Token']
>>>> print(":::Token Generated")
>>>> else:
>>>> print('TokenNotGeneratedFrom: ')
>>>> # raise TokenNotGeneratedFrom(500, 'Token not
>>>> Generated from ')
>>>> return token
>>>> except Exception as e:
>>>> print('TokenNotGeneratedFrom: ' + str(e))
>>>> # raise TokenNotGeneratedFrom(500, str(e))
>>>>
>>>> def call_to_cust_bulk_api(url, payload):
>>>> print("Calling Bulk API")
>>>> try:
>>>> # TODO: write code...
>>>> headers = {'content-type': 'application/json'}
>>>> print(":::jsn load")
&

Re: Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Gourav,

Do you have any examples or links, please? That would help me to understand.

Thanks,
Sid

On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta 
wrote:

> Hi,
> I think that serialising data using spark is an overkill, why not use
> normal python.
>
> Also have you tried repartition by range, that way you can use modulus
> operator to batch things up?
>
> Regards,
> Gourav
>
>
> On Mon, Jun 13, 2022 at 8:37 AM Sid  wrote:
>
>> Hi Team,
>>
>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>
>> My end goal is to achieve is something like the below:
>>
>>
>>1.  Generate the data
>>2. Send the data in the batch of 4k records in one batch since the
>>API can accept the 4k records at once.
>>3. The record would be as the below:
>>4.
>>
>>{
>>"Token": "",
>>"CustomerName": "",
>>"Object": "",
>>"Data": [{"A":"1"},{"A":"2"}]
>>}
>>
>>5. Token will be generated first then it would be passed to the
>>'Token' key in the data.
>>
>> For the above goal, I initially wrote something like the below which
>> gives a heap error because the data frame is getting created on the driver
>> side, and the size of the records is a minimum of 1M.
>>df = modifiedData # Assume it to be query results stored as a
>> DF
>>
>> df = df.withColumn("uniqueID", lit("1"))
>>
>> df = df.withColumn("row_num", row_number().over(
>>
>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>> ))
>> tokenUrl = ""
>> # tokenUrl = ""
>> policyUrl = ""
>> tokenBody = {"Username": "", "Password": "", "CustomerName":
>> ""}
>>
>> def get_token(url, payload):
>> try:
>> print("Getting Token")
>> response = requests.request("POST", url, data=payload)
>> data = response.json()
>> if data['ErrorDescription'] == 'Success':
>> token = data['Token']
>> print(":::Token Generated")
>> else:
>> print('TokenNotGeneratedFrom: ')
>> # raise TokenNotGeneratedFrom(500, 'Token not
>> Generated from ')
>> return token
>> except Exception as e:
>> print('TokenNotGeneratedFrom: ' + str(e))
>> # raise TokenNotGeneratedFrom(500, str(e))
>>
>> def call_to_cust_bulk_api(url, payload):
>> print("Calling Bulk API")
>> try:
>> # TODO: write code...
>> headers = {'content-type': 'application/json'}
>> print(":::jsn load")
>> # print(json.dumps(payload))
>> # print(payload)
>> response = requests.post(url,
>> data=json.dumps(payload), headers=headers)
>> # print(json.dumps(payload))
>> data = response.json()
>> return data
>> except Exception as e:
>> print('ExceptionInPushingDataTo: ' + str(e))
>> # raise ExceptionInPushingDataTo(500, str(e))
>>
>> total_count = df.count()
>> i = 1
>> while i < total_count:
>> rangeNum = i + 3999
>> print("Range Num:::")
>> print(rangeNum)
>> df1 = df.filter((col("row_num") >= i) & (col("row_num")
>> <= rangeNum))
>> df1.cache()
>> maxValue =
>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>> finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
>> print("finalDF count:::", finalDF.count())
>>

Redesign approach for hitting the APIs using PySpark

2022-06-13 Thread Sid
Hi Team,

I am trying to hit the POST APIs for the very first time using Pyspark.

My end goal is to achieve is something like the below:


   1.  Generate the data
   2. Send the data in the batch of 4k records in one batch since the API
   can accept the 4k records at once.
   3. The record would be as the below:
   4.

   {
   "Token": "",
   "CustomerName": "",
   "Object": "",
   "Data": [{"A":"1"},{"A":"2"}]
   }

   5. Token will be generated first then it would be passed to the 'Token'
   key in the data.

For the above goal, I initially wrote something like the below which gives
a heap error because the data frame is getting created on the driver side,
and the size of the records is a minimum of 1M.
   df = modifiedData # Assume it to be query results stored as a DF

df = df.withColumn("uniqueID", lit("1"))

df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
))
tokenUrl = ""
# tokenUrl = ""
policyUrl = ""
tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

def get_token(url, payload):
try:
print("Getting Token")
response = requests.request("POST", url, data=payload)
data = response.json()
if data['ErrorDescription'] == 'Success':
token = data['Token']
print(":::Token Generated")
else:
print('TokenNotGeneratedFrom: ')
# raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
return token
except Exception as e:
print('TokenNotGeneratedFrom: ' + str(e))
# raise TokenNotGeneratedFrom(500, str(e))

def call_to_cust_bulk_api(url, payload):
print("Calling Bulk API")
try:
# TODO: write code...
headers = {'content-type': 'application/json'}
print(":::jsn load")
# print(json.dumps(payload))
# print(payload)
response = requests.post(url, data=json.dumps(payload),
headers=headers)
# print(json.dumps(payload))
data = response.json()
return data
except Exception as e:
print('ExceptionInPushingDataTo: ' + str(e))
# raise ExceptionInPushingDataTo(500, str(e))

total_count = df.count()
i = 1
while i < total_count:
rangeNum = i + 3999
print("Range Num:::")
print(rangeNum)
df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
rangeNum))
df1.cache()
maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
print("finalDF count:::", finalDF.count())
token = get_token(tokenUrl, tokenBody)

result =
json.loads((finalDF.toPandas().to_json(orient="records")))
# token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": result
}

# print("Customer Request Body::")
# print(json.dumps(custRequestBody))
response = call_to_cust_bulk_api(policyUrl, custRequestBody)
print(response)
finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow(.withColumn(
"status_for_each_batch",
lit(str(response)))


print("Max Value:::")
print(maxValue)
print("Next I:::")
i = rangeNum + 1
print(i)

This is my very first approach to hitting the APIs with Spark. So, could
anyone please help me to redesign the approach, or can share some links or
references using which I can go to the depth of this and rectify myself.
How can I scale this?


Any help is much appreciated.

TIA,
Sid


Re: API Problem

2022-06-11 Thread Sid
Hi Enrico,

Thanks for helping me to understand the mistakes.

My end goal is to achieve is something like the below:


   1.  Generate the data
   2. Send the data in the batch of 4k records in one batch since the API
   can accept the 4k records at once.
   3. The record would be as the below:
   4.

   {
   "Token": "",
   "CustomerName": "",
   "Object": "",
   "Data": [{"A":"1"},{"A":"2"}]
   }

   5. Token will be generated first then it would be passed to the 'Token'
   key in the data.

For the above goal, I initially wrote something like the below which gives
a heap error because the data frame is getting created on the driver side,
and the size of the records is a minimum of 1M.
   df = modifiedData

df = df.withColumn("uniqueID", lit("1"))

df = df.withColumn("row_num", row_number().over(
Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
))
tokenUrl = ""
# tokenUrl = ""
policyUrl = ""
tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

def get_token(url, payload):
try:_2022_06_11
print("Getting Token")
response = requests.request("POST", url, data=payload)
data = response.json()
if data['ErrorDescription'] == 'Success':
token = data['Token']
print(":::Token Generated")
else:
print('TokenNotGeneratedFrom: ')
# raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
return token
except Exception as e:
print('TokenNotGeneratedFrom: ' + str(e))
# raise TokenNotGeneratedFrom(500, str(e))

def call_to_cust_bulk_api(url, payload):
print("Calling Bulk API")
try:
# TODO: write code...
headers = {'content-type': 'application/json'}
print(":::jsn load")
# print(json.dumps(payload))
# print(payload)
response = requests.post(url, data=json.dumps(payload),
headers=headers)
# print(json.dumps(payload))
data = response.json()
return data
except Exception as e:
print('ExceptionInPushingDataTo: ' + str(e))
# raise ExceptionInPushingDataTo(500, str(e))

total_count = df.count()
i = 1
while i < total_count:
rangeNum = i + 3999
print("Range Num:::")
print(rangeNum)
df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
rangeNum))
df1.cache()
maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
print("finalDF count:::", finalDF.count())
token = get_token(tokenUrl, tokenBody)

result =
json.loads((finalDF.toPandas().to_json(orient="records")))
# token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": result
}

# print("Customer Request Body::")
# print(json.dumps(custRequestBody))
response = call_to_cust_bulk_api(policyUrl, custRequestBody)
print(response)
finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow(.withColumn(
"status_for_each_batch",
lit(str(response)))


print("Max Value:::")
print(maxValue)
print("Next I:::")
i = rangeNum + 1
print(i)

This is my very first approach to hitting the APIs with Spark. So, could
you please help me to redesign the approach, or can share some links or
references using which I can go to the depth of this an

Re: API Problem

2022-06-10 Thread Sid
Hi Enrico,

Thanks for your time. Much appreciated.

I am expecting the payload to be as a JSON string to be a record like below:

{"A":"some_value","B":"some_value"}

Where A and B are the columns in my dataset.


On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack 
wrote:

> Sid,
>
> just recognized you are using Python API here. Then
> struct(*colsListToBePassed)) should be correct, given it takes a list of
> strings.
>
> Your method call_to_cust_bulk_api takes argument payload, which is a
> Column. This is then used in custRequestBody. That is pretty strange use
> of a column expression. What do you expect print(payload) to be?
>
> I recommend to split that complex command into multiple commands to find
> out what "an error of column not iterable" refers to.
>
> Enrico
>
>
> Am 10.06.22 um 13:39 schrieb Enrico Minack:
>
> Hi Sid,
>
> finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
>  .withColumn("status_for_batch", 
> call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed
>
> You are calling withColumn with the result of call_to_cust_bulk_api as
> the second argument. That result looks like it is of type string. But
> withColumn expects type Column. You can turn that string into a Column
> using lit:
>
> finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
>  .withColumn("status_for_batch", 
> lit(call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed)
>
>
> You are saying that gives you an error of column not iterable. I reckon
> the struct(*colsListToBePassed)) is wrong.
>
> Method struct requires a single string followed by a list of strings.
> Given your colsListToBePassed is a list of strings, this does not work.
> Try:
>
>   struct(colsListToBePassed.head, colsListToBePassed.tail: _*))
>
> Alternatively, struct requires a list of Column, so try this:
>
>   struct(colsListToBePassed.map(col): _*))
>
> The API is pretty clear about the types it expects.
>
>
> If you are still having errors, you better please paste the code and
> error.
>
> Enrico
>
>
>
> Am 09.06.22 um 21:31 schrieb Sid:
>
> Hi Experts,
>
> I am facing one problem while passing a column to the method.  The problem
> is described in detail here:
>
>
> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>
> TIA,
> Sid
>
>
>
>


Re: API Problem

2022-06-10 Thread Sid
Hi Stelios,

Thank you so much for your help.

If I use lit it gives an error of column not iterable.

Can you suggest a simple way of achieving my use case? I need to send the
entire column record by record to the API in JSON format.


TIA,
Sid

On Fri, Jun 10, 2022 at 2:51 PM Stelios Philippou 
wrote:

> Sid
> Then the issue is on the data in the way you are creating them for that
> specific column.
>
> call_to_cust_bulk_api(policyUrl,to_json(struct(*colsListToBePassed)))
>
> Perhaps wrap that in a
>
> lit(call_to_cust_bulk_api(policyUrl,to_json(struct(*colsListToBePassed
>
> else you will need to start sending simpler data there to make sure that the 
> API works
>
>
> On Fri, 10 Jun 2022 at 12:15, Sid  wrote:
>
>> Still,  it is giving the same error.
>>
>> On Fri, Jun 10, 2022 at 5:13 AM Sean Owen  wrote:
>>
>>> That repartition seems to do nothing? But yes the key point is use col()
>>>
>>> On Thu, Jun 9, 2022, 9:41 PM Stelios Philippou 
>>> wrote:
>>>
>>>> Perhaps
>>>>
>>>>
>>>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch
>>>>
>>>> To
>>>>
>>>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 9 Jun 2022, 22:32 Sid,  wrote:
>>>>
>>>>> Hi Experts,
>>>>>
>>>>> I am facing one problem while passing a column to the method.  The
>>>>> problem is described in detail here:
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>>>>
>>>>> TIA,
>>>>> Sid
>>>>>
>>>>


Re: API Problem

2022-06-10 Thread Sid
Still,  it is giving the same error.

On Fri, Jun 10, 2022 at 5:13 AM Sean Owen  wrote:

> That repartition seems to do nothing? But yes the key point is use col()
>
> On Thu, Jun 9, 2022, 9:41 PM Stelios Philippou  wrote:
>
>> Perhaps
>>
>>
>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch
>>
>> To
>>
>> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")
>>
>>
>>
>>
>> On Thu, 9 Jun 2022, 22:32 Sid,  wrote:
>>
>>> Hi Experts,
>>>
>>> I am facing one problem while passing a column to the method.  The
>>> problem is described in detail here:
>>>
>>>
>>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>>
>>> TIA,
>>> Sid
>>>
>>


API Problem

2022-06-09 Thread Sid
Hi Experts,

I am facing one problem while passing a column to the method.  The problem
is described in detail here:

https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark

TIA,
Sid


Re: How the data is distributed

2022-06-07 Thread Sid
Thank you for the information.


On Tue, 7 Jun 2022, 03:21 Sean Owen,  wrote:

> Data is not distributed to executors by anything. If you are processing
> data with Spark. Spark spawns tasks on executors to read chunks of data
> from wherever they are (S3, HDFS, etc).
>
>
> On Mon, Jun 6, 2022 at 4:07 PM Sid  wrote:
>
>> Hi experts,
>>
>>
>> When we load any file, I know that based on the information in the spark
>> session about the executors location, status and etc , the data is
>> distributed among the worker nodes and executors.
>>
>> But I have one doubt. Is the data initially loaded on the driver and then
>> it is distributed or it is directly distributed amongst the workers?
>>
>> Thanks,
>> Sid
>>
>


How the data is distributed

2022-06-06 Thread Sid
Hi experts,


When we load any file, I know that based on the information in the spark
session about the executors location, status and etc , the data is
distributed among the worker nodes and executors.

But I have one doubt. Is the data initially loaded on the driver and then
it is distributed or it is directly distributed amongst the workers?

Thanks,
Sid


Re: Unable to format timestamp values in pyspark

2022-05-30 Thread Sid
Yeah, Stelios. It worked. Could you please post it as an answer so that I
can accept it on the post and can be of help to people?

Thanks,
Sid

On Mon, May 30, 2022 at 4:42 PM Stelios Philippou 
wrote:

> Sid,
>
> According to the error that i am seeing there, this is the Date Format
> issue.
>
> Text '5/1/2019 1:02:16' could not be parsed
>
>
> But your time format is specific as such
>
> 'M/dd/ H:mm:ss')
>
> You can see that the day specific is /1/ but your format is dd which
> expects two digits.
>
> Please try the following format and let us know
>
> 'M/d/ H:mm:ss'
>
>
>
>
>
> On Mon, 30 May 2022 at 11:05, Sid  wrote:
>
>> Hi Team,
>>
>> I am able to convert to timestamp. However, when I try to filter out the
>> records based on a specific value it gives an error as mentioned in the
>> post. Could you please help me with this?
>>
>>
>> https://stackoverflow.com/questions/72422897/unable-to-format-timestamp-in-pyspark/72423394#72423394
>>
>>
>> Best Regards,
>> Sid
>>
>


Unable to format timestamp values in pyspark

2022-05-30 Thread Sid
Hi Team,

I am able to convert to timestamp. However, when I try to filter out the
records based on a specific value it gives an error as mentioned in the
post. Could you please help me with this?

https://stackoverflow.com/questions/72422897/unable-to-format-timestamp-in-pyspark/72423394#72423394


Best Regards,
Sid


Unable to convert double values

2022-05-29 Thread Sid
Hi Team,

I need help with the below problem:

https://stackoverflow.com/questions/72422872/unable-to-format-double-values-in-pyspark?noredirect=1#comment127940175_72422872


What am I doing wrong?

Thanks,
Siddhesh


Re: Complexity with the data

2022-05-26 Thread Sid
Hi Gourav,

Please find the below link for a detailed understanding.

https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark/72391090#72391090

@Bjørn Jørgensen  :

I was able to read such kind of data using the below code:

spark.read.option("header",True).option("multiline","true").option("escape","\"").csv("sample1.csv")


Also, I have one question about one of my columns. I have one column
with data like below:


[image: image.png]


Have a look at the second record. Should I mark it as corrupt record?
Or is there anyway to process such kind of records.


Thanks,

Sid





On Thu, May 26, 2022 at 10:54 PM Gourav Sengupta 
wrote:

> Hi,
> can you please give us a simple map of what the input is and what the
> output should be like? From your description it looks a bit difficult to
> figure out what exactly or how exactly you want the records actually parsed.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, May 25, 2022 at 9:08 PM Sid  wrote:
>
>> Hi Experts,
>>
>> I have below CSV data that is getting generated automatically. I can't
>> change the data manually.
>>
>> The data looks like below:
>>
>> 2020-12-12,abc,2000,,INR,
>> 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>> 2020-12-09,fgh,,software_developer,I only manage the development part.
>>
>> Since I don't have much experience with the other domains.
>>
>> It is handled by the other people.,INR
>> 2020-12-12,abc,2000,,USD,
>>
>> The third record is a problem. Since the value is separated by the new
>> line by the user while filling up the form. So, how do I handle this?
>>
>> There are 6 columns and 4 records in total. These are the sample records.
>>
>> Should I load it as RDD and then may be using a regex should eliminate
>> the new lines? Or how it should be? with ". /n" ?
>>
>> Any suggestions?
>>
>> Thanks,
>> Sid
>>
>


Re: Complexity with the data

2022-05-26 Thread Sid
I am not reading it through pandas. I am using Spark because when I tried
to use pandas which comes under import pyspark.pandas, it gives me an
error.

On Thu, May 26, 2022 at 9:52 PM Bjørn Jørgensen 
wrote:

> ok, but how do you read it now?
>
>
> https://github.com/apache/spark/blob/8f610d1b4ce532705c528f3c085b0289b2b17a94/python/pyspark/pandas/namespace.py#L216
> probably have to be updated with the default options. This is so that
> pandas API on spark will be like pandas.
>
> tor. 26. mai 2022 kl. 17:38 skrev Sid :
>
>> I was passing the wrong escape characters due to which I was facing the
>> issue. I have updated the user's answer on my post. Now I am able to load
>> the dataset.
>>
>> Thank you everyone for your time and help!
>>
>> Much appreciated.
>>
>> I have more datasets like this. I hope that would be resolved using this
>> approach :) Fingers crossed.
>>
>> Thanks,
>> Sid
>>
>> On Thu, May 26, 2022 at 8:43 PM Apostolos N. Papadopoulos <
>> papad...@csd.auth.gr> wrote:
>>
>>> Since you cannot create the DF directly, you may try to first create an
>>> RDD of tuples from the file
>>>
>>> and then convert the RDD to a DF by using the toDF() transformation.
>>>
>>> Perhaps you may bypass the issue with this.
>>>
>>> Another thing that I have seen in the example is that you are using ""
>>> as an escape character.
>>>
>>> Can you check if this may cause any issues?
>>>
>>> Regards,
>>>
>>> Apostolos
>>>
>>>
>>>
>>> On 26/5/22 16:31, Sid wrote:
>>>
>>> Thanks for opening the issue, Bjorn. However, could you help me to
>>> address the problem for now with some kind of alternative?
>>>
>>> I am actually stuck in this since yesterday.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, 
>>> wrote:
>>>
>>>> Yes, it looks like a bug that we also have in pandas API on spark.
>>>>
>>>> So I have opened a JIRA
>>>> <https://issues.apache.org/jira/browse/SPARK-39304> for this.
>>>>
>>>> tor. 26. mai 2022 kl. 11:09 skrev Sid :
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> I have posted a question finally with the dataset and the column names.
>>>>>
>>>>> PFB link:
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen <
>>>>> bjornjorgen...@gmail.com> wrote:
>>>>>
>>>>>> Sid, dump one of yours files.
>>>>>>
>>>>>>
>>>>>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>>>>>
>>>>>>
>>>>>>
>>>>>> ons. 25. mai 2022, 23:04 skrev Sid :
>>>>>>
>>>>>>> I have 10 columns with me but in the dataset, I observed that some
>>>>>>> records have 11 columns of data(for the additional column it is marked 
>>>>>>> as
>>>>>>> null). But, how do I handle this?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sid
>>>>>>>
>>>>>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>>>>>
>>>>>>>> How can I do that? Any examples or links, please. So, this works
>>>>>>>> well with pandas I suppose. It's just that I need to convert back to 
>>>>>>>> the
>>>>>>>> spark data frame by providing a schema but since we are using a lower 
>>>>>>>> spark
>>>>>>>> version and pandas won't work in a distributed way in the lower 
>>>>>>>> versions,
>>>>>>>> therefore, was wondering if spark could handle this in a much better 
>>>>>>>> way.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sid
>>>>>>>>
>>>>>>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>

Re: Complexity with the data

2022-05-26 Thread Sid
I was passing the wrong escape characters due to which I was facing the
issue. I have updated the user's answer on my post. Now I am able to load
the dataset.

Thank you everyone for your time and help!

Much appreciated.

I have more datasets like this. I hope that would be resolved using this
approach :) Fingers crossed.

Thanks,
Sid

On Thu, May 26, 2022 at 8:43 PM Apostolos N. Papadopoulos <
papad...@csd.auth.gr> wrote:

> Since you cannot create the DF directly, you may try to first create an
> RDD of tuples from the file
>
> and then convert the RDD to a DF by using the toDF() transformation.
>
> Perhaps you may bypass the issue with this.
>
> Another thing that I have seen in the example is that you are using "" as
> an escape character.
>
> Can you check if this may cause any issues?
>
> Regards,
>
> Apostolos
>
>
>
> On 26/5/22 16:31, Sid wrote:
>
> Thanks for opening the issue, Bjorn. However, could you help me to address
> the problem for now with some kind of alternative?
>
> I am actually stuck in this since yesterday.
>
> Thanks,
> Sid
>
> On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, 
> wrote:
>
>> Yes, it looks like a bug that we also have in pandas API on spark.
>>
>> So I have opened a JIRA
>> <https://issues.apache.org/jira/browse/SPARK-39304> for this.
>>
>> tor. 26. mai 2022 kl. 11:09 skrev Sid :
>>
>>> Hello Everyone,
>>>
>>> I have posted a question finally with the dataset and the column names.
>>>
>>> PFB link:
>>>
>>>
>>> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen <
>>> bjornjorgen...@gmail.com> wrote:
>>>
>>>> Sid, dump one of yours files.
>>>>
>>>>
>>>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>>>
>>>>
>>>>
>>>> ons. 25. mai 2022, 23:04 skrev Sid :
>>>>
>>>>> I have 10 columns with me but in the dataset, I observed that some
>>>>> records have 11 columns of data(for the additional column it is marked as
>>>>> null). But, how do I handle this?
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>>>
>>>>>> How can I do that? Any examples or links, please. So, this works well
>>>>>> with pandas I suppose. It's just that I need to convert back to the spark
>>>>>> data frame by providing a schema but since we are using a lower spark
>>>>>> version and pandas won't work in a distributed way in the lower versions,
>>>>>> therefore, was wondering if spark could handle this in a much better way.
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray 
>>>>>> wrote:
>>>>>>
>>>>>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>>>>>
>>>>>>> You need to normalize the CSV with a parser that can escape commas
>>>>>>> inside of strings
>>>>>>> Not sure if Spark has an option for this?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>>>>>
>>>>>>>> Thank you so much for your time.
>>>>>>>>
>>>>>>>> I have data like below which I tried to load by setting multiple
>>>>>>>> options while reading the file but however, but I am not able to
>>>>>>>> consolidate the 9th column data within itself.
>>>>>>>>
>>>>>>>> [image: image.png]
>>>>>>>>
>>>>>>>> I tried the below code:
>>>>>>>>
>>>>>>>> df = spark.read.option("header", "true").option("multiline",
>>>>>>>> "true").option("inferSchema", "true").option("quote",
>>>>>>>>
>>>>>>>>   '"').option(
>>>>>>>> "delimiter", ",").csv("path")
>>

Re: Complexity with the data

2022-05-26 Thread Sid
Thanks for opening the issue, Bjorn. However, could you help me to address
the problem for now with some kind of alternative?

I am actually stuck in this since yesterday.

Thanks,
Sid

On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, 
wrote:

> Yes, it looks like a bug that we also have in pandas API on spark.
>
> So I have opened a JIRA
> <https://issues.apache.org/jira/browse/SPARK-39304> for this.
>
> tor. 26. mai 2022 kl. 11:09 skrev Sid :
>
>> Hello Everyone,
>>
>> I have posted a question finally with the dataset and the column names.
>>
>> PFB link:
>>
>>
>> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>>
>> Thanks,
>> Sid
>>
>> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen 
>> wrote:
>>
>>> Sid, dump one of yours files.
>>>
>>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>>
>>>
>>>
>>> ons. 25. mai 2022, 23:04 skrev Sid :
>>>
>>>> I have 10 columns with me but in the dataset, I observed that some
>>>> records have 11 columns of data(for the additional column it is marked as
>>>> null). But, how do I handle this?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>>
>>>>> How can I do that? Any examples or links, please. So, this works well
>>>>> with pandas I suppose. It's just that I need to convert back to the spark
>>>>> data frame by providing a schema but since we are using a lower spark
>>>>> version and pandas won't work in a distributed way in the lower versions,
>>>>> therefore, was wondering if spark could handle this in a much better way.
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray 
>>>>> wrote:
>>>>>
>>>>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>>>>
>>>>>> You need to normalize the CSV with a parser that can escape commas
>>>>>> inside of strings
>>>>>> Not sure if Spark has an option for this?
>>>>>>
>>>>>>
>>>>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>>>>
>>>>>>> Thank you so much for your time.
>>>>>>>
>>>>>>> I have data like below which I tried to load by setting multiple
>>>>>>> options while reading the file but however, but I am not able to
>>>>>>> consolidate the 9th column data within itself.
>>>>>>>
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> I tried the below code:
>>>>>>>
>>>>>>> df = spark.read.option("header", "true").option("multiline",
>>>>>>> "true").option("inferSchema", "true").option("quote",
>>>>>>>
>>>>>>>   '"').option(
>>>>>>> "delimiter", ",").csv("path")
>>>>>>>
>>>>>>> What else I can do?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sid
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
>>>>>>> papad...@csd.auth.gr> wrote:
>>>>>>>
>>>>>>>> Dear Sid,
>>>>>>>>
>>>>>>>> can you please give us more info? Is it true that every line may
>>>>>>>> have a
>>>>>>>> different number of columns? Is there any rule followed by
>>>>>>>>
>>>>>>>> every line of the file? From the information you have sent I cannot
>>>>>>>> fully understand the "schema" of your data.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Apostolos
>>>>>>>>
>>>>>>>>
>>>>>>>> On 25/5/22 23:06, Sid wrote:
>>>>>>>> > Hi Experts,
>>>>>>>> >
>>>>>>>> > I have below CSV data that is getting generated automatically. I
>>>>>>>> can't
>>>>>>>> > change the data manually.
>>>>>>>> >
>>>>>>>> > The data looks like below:
>>>>>>>> >
>>>>>>>> > 2020-12-12,abc,2000,,INR,
>>>>>>>> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>>>>>>>> > 2020-12-09,fgh,,software_developer,I only manage the development
>>>>>>>> part.
>>>>>>>> >
>>>>>>>> > Since I don't have much experience with the other domains.
>>>>>>>> >
>>>>>>>> > It is handled by the other people.,INR
>>>>>>>> > 2020-12-12,abc,2000,,USD,
>>>>>>>> >
>>>>>>>> > The third record is a problem. Since the value is separated by
>>>>>>>> the new
>>>>>>>> > line by the user while filling up the form. So, how do I
>>>>>>>> handle this?
>>>>>>>> >
>>>>>>>> > There are 6 columns and 4 records in total. These are the sample
>>>>>>>> records.
>>>>>>>> >
>>>>>>>> > Should I load it as RDD and then may be using a regex should
>>>>>>>> eliminate
>>>>>>>> > the new lines? Or how it should be? with ". /n" ?
>>>>>>>> >
>>>>>>>> > Any suggestions?
>>>>>>>> >
>>>>>>>> > Thanks,
>>>>>>>> > Sid
>>>>>>>>
>>>>>>>> --
>>>>>>>> Apostolos N. Papadopoulos, Associate Professor
>>>>>>>> Department of Informatics
>>>>>>>> Aristotle University of Thessaloniki
>>>>>>>> Thessaloniki, GREECE
>>>>>>>> tel: ++0030312310991918
>>>>>>>> email: papad...@csd.auth.gr
>>>>>>>> twitter: @papadopoulos_ap
>>>>>>>> web: http://datalab.csd.auth.gr/~apostol
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: Complexity with the data

2022-05-26 Thread Sid
Hello Everyone,

I have posted a question finally with the dataset and the column names.

PFB link:

https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark

Thanks,
Sid

On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen 
wrote:

> Sid, dump one of yours files.
>
> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>
>
>
> ons. 25. mai 2022, 23:04 skrev Sid :
>
>> I have 10 columns with me but in the dataset, I observed that some
>> records have 11 columns of data(for the additional column it is marked as
>> null). But, how do I handle this?
>>
>> Thanks,
>> Sid
>>
>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>
>>> How can I do that? Any examples or links, please. So, this works well
>>> with pandas I suppose. It's just that I need to convert back to the spark
>>> data frame by providing a schema but since we are using a lower spark
>>> version and pandas won't work in a distributed way in the lower versions,
>>> therefore, was wondering if spark could handle this in a much better way.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray  wrote:
>>>
>>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>>
>>>> You need to normalize the CSV with a parser that can escape commas
>>>> inside of strings
>>>> Not sure if Spark has an option for this?
>>>>
>>>>
>>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>>
>>>>> Thank you so much for your time.
>>>>>
>>>>> I have data like below which I tried to load by setting multiple
>>>>> options while reading the file but however, but I am not able to
>>>>> consolidate the 9th column data within itself.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> I tried the below code:
>>>>>
>>>>> df = spark.read.option("header", "true").option("multiline",
>>>>> "true").option("inferSchema", "true").option("quote",
>>>>>
>>>>> '"').option(
>>>>> "delimiter", ",").csv("path")
>>>>>
>>>>> What else I can do?
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>>
>>>>> On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
>>>>> papad...@csd.auth.gr> wrote:
>>>>>
>>>>>> Dear Sid,
>>>>>>
>>>>>> can you please give us more info? Is it true that every line may have
>>>>>> a
>>>>>> different number of columns? Is there any rule followed by
>>>>>>
>>>>>> every line of the file? From the information you have sent I cannot
>>>>>> fully understand the "schema" of your data.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Apostolos
>>>>>>
>>>>>>
>>>>>> On 25/5/22 23:06, Sid wrote:
>>>>>> > Hi Experts,
>>>>>> >
>>>>>> > I have below CSV data that is getting generated automatically. I
>>>>>> can't
>>>>>> > change the data manually.
>>>>>> >
>>>>>> > The data looks like below:
>>>>>> >
>>>>>> > 2020-12-12,abc,2000,,INR,
>>>>>> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>>>>>> > 2020-12-09,fgh,,software_developer,I only manage the development
>>>>>> part.
>>>>>> >
>>>>>> > Since I don't have much experience with the other domains.
>>>>>> >
>>>>>> > It is handled by the other people.,INR
>>>>>> > 2020-12-12,abc,2000,,USD,
>>>>>> >
>>>>>> > The third record is a problem. Since the value is separated by the
>>>>>> new
>>>>>> > line by the user while filling up the form. So, how do I
>>>>>> handle this?
>>>>>> >
>>>>>> > There are 6 columns and 4 records in total. These are the sample
>>>>>> records.
>>>>>> >
>>>>>> > Should I load it as RDD and then may be using a regex should
>>>>>> eliminate
>>>>>> > the new lines? Or how it should be? with ". /n" ?
>>>>>> >
>>>>>> > Any suggestions?
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Sid
>>>>>>
>>>>>> --
>>>>>> Apostolos N. Papadopoulos, Associate Professor
>>>>>> Department of Informatics
>>>>>> Aristotle University of Thessaloniki
>>>>>> Thessaloniki, GREECE
>>>>>> tel: ++0030312310991918
>>>>>> email: papad...@csd.auth.gr
>>>>>> twitter: @papadopoulos_ap
>>>>>> web: http://datalab.csd.auth.gr/~apostol
>>>>>>
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>


Re: Complexity with the data

2022-05-25 Thread Sid
I have 10 columns with me but in the dataset, I observed that some records
have 11 columns of data(for the additional column it is marked as null).
But, how do I handle this?

Thanks,
Sid

On Thu, May 26, 2022 at 2:22 AM Sid  wrote:

> How can I do that? Any examples or links, please. So, this works well with
> pandas I suppose. It's just that I need to convert back to the spark data
> frame by providing a schema but since we are using a lower spark version
> and pandas won't work in a distributed way in the lower versions,
> therefore, was wondering if spark could handle this in a much better way.
>
> Thanks,
> Sid
>
> On Thu, May 26, 2022 at 2:19 AM Gavin Ray  wrote:
>
>> Forgot to reply-all last message, whoops. Not very good at email.
>>
>> You need to normalize the CSV with a parser that can escape commas inside
>> of strings
>> Not sure if Spark has an option for this?
>>
>>
>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>
>>> Thank you so much for your time.
>>>
>>> I have data like below which I tried to load by setting multiple options
>>> while reading the file but however, but I am not able to consolidate the
>>> 9th column data within itself.
>>>
>>> [image: image.png]
>>>
>>> I tried the below code:
>>>
>>> df = spark.read.option("header", "true").option("multiline",
>>> "true").option("inferSchema", "true").option("quote",
>>>
>>>   '"').option(
>>> "delimiter", ",").csv("path")
>>>
>>> What else I can do?
>>>
>>> Thanks,
>>> Sid
>>>
>>>
>>> On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
>>> papad...@csd.auth.gr> wrote:
>>>
>>>> Dear Sid,
>>>>
>>>> can you please give us more info? Is it true that every line may have a
>>>> different number of columns? Is there any rule followed by
>>>>
>>>> every line of the file? From the information you have sent I cannot
>>>> fully understand the "schema" of your data.
>>>>
>>>> Regards,
>>>>
>>>> Apostolos
>>>>
>>>>
>>>> On 25/5/22 23:06, Sid wrote:
>>>> > Hi Experts,
>>>> >
>>>> > I have below CSV data that is getting generated automatically. I
>>>> can't
>>>> > change the data manually.
>>>> >
>>>> > The data looks like below:
>>>> >
>>>> > 2020-12-12,abc,2000,,INR,
>>>> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>>>> > 2020-12-09,fgh,,software_developer,I only manage the development part.
>>>> >
>>>> > Since I don't have much experience with the other domains.
>>>> >
>>>> > It is handled by the other people.,INR
>>>> > 2020-12-12,abc,2000,,USD,
>>>> >
>>>> > The third record is a problem. Since the value is separated by the
>>>> new
>>>> > line by the user while filling up the form. So, how do I handle this?
>>>> >
>>>> > There are 6 columns and 4 records in total. These are the sample
>>>> records.
>>>> >
>>>> > Should I load it as RDD and then may be using a regex should
>>>> eliminate
>>>> > the new lines? Or how it should be? with ". /n" ?
>>>> >
>>>> > Any suggestions?
>>>> >
>>>> > Thanks,
>>>> > Sid
>>>>
>>>> --
>>>> Apostolos N. Papadopoulos, Associate Professor
>>>> Department of Informatics
>>>> Aristotle University of Thessaloniki
>>>> Thessaloniki, GREECE
>>>> tel: ++0030312310991918
>>>> email: papad...@csd.auth.gr
>>>> twitter: @papadopoulos_ap
>>>> web: http://datalab.csd.auth.gr/~apostol
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>


Re: Complexity with the data

2022-05-25 Thread Sid
How can I do that? Any examples or links, please. So, this works well with
pandas I suppose. It's just that I need to convert back to the spark data
frame by providing a schema but since we are using a lower spark version
and pandas won't work in a distributed way in the lower versions,
therefore, was wondering if spark could handle this in a much better way.

Thanks,
Sid

On Thu, May 26, 2022 at 2:19 AM Gavin Ray  wrote:

> Forgot to reply-all last message, whoops. Not very good at email.
>
> You need to normalize the CSV with a parser that can escape commas inside
> of strings
> Not sure if Spark has an option for this?
>
>
> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>
>> Thank you so much for your time.
>>
>> I have data like below which I tried to load by setting multiple options
>> while reading the file but however, but I am not able to consolidate the
>> 9th column data within itself.
>>
>> [image: image.png]
>>
>> I tried the below code:
>>
>> df = spark.read.option("header", "true").option("multiline",
>> "true").option("inferSchema", "true").option("quote",
>>
>> '"').option(
>> "delimiter", ",").csv("path")
>>
>> What else I can do?
>>
>> Thanks,
>> Sid
>>
>>
>> On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
>> papad...@csd.auth.gr> wrote:
>>
>>> Dear Sid,
>>>
>>> can you please give us more info? Is it true that every line may have a
>>> different number of columns? Is there any rule followed by
>>>
>>> every line of the file? From the information you have sent I cannot
>>> fully understand the "schema" of your data.
>>>
>>> Regards,
>>>
>>> Apostolos
>>>
>>>
>>> On 25/5/22 23:06, Sid wrote:
>>> > Hi Experts,
>>> >
>>> > I have below CSV data that is getting generated automatically. I can't
>>> > change the data manually.
>>> >
>>> > The data looks like below:
>>> >
>>> > 2020-12-12,abc,2000,,INR,
>>> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>>> > 2020-12-09,fgh,,software_developer,I only manage the development part.
>>> >
>>> > Since I don't have much experience with the other domains.
>>> >
>>> > It is handled by the other people.,INR
>>> > 2020-12-12,abc,2000,,USD,
>>> >
>>> > The third record is a problem. Since the value is separated by the new
>>> > line by the user while filling up the form. So, how do I handle this?
>>> >
>>> > There are 6 columns and 4 records in total. These are the sample
>>> records.
>>> >
>>> > Should I load it as RDD and then may be using a regex should eliminate
>>> > the new lines? Or how it should be? with ". /n" ?
>>> >
>>> > Any suggestions?
>>> >
>>> > Thanks,
>>> > Sid
>>>
>>> --
>>> Apostolos N. Papadopoulos, Associate Professor
>>> Department of Informatics
>>> Aristotle University of Thessaloniki
>>> Thessaloniki, GREECE
>>> tel: ++0030312310991918
>>> email: papad...@csd.auth.gr
>>> twitter: @papadopoulos_ap
>>> web: http://datalab.csd.auth.gr/~apostol
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Complexity with the data

2022-05-25 Thread Sid
Thank you so much for your time.

I have data like below which I tried to load by setting multiple options
while reading the file but however, but I am not able to consolidate the
9th column data within itself.

[image: image.png]

I tried the below code:

df = spark.read.option("header", "true").option("multiline",
"true").option("inferSchema", "true").option("quote",

  '"').option(
"delimiter", ",").csv("path")

What else I can do?

Thanks,
Sid


On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
papad...@csd.auth.gr> wrote:

> Dear Sid,
>
> can you please give us more info? Is it true that every line may have a
> different number of columns? Is there any rule followed by
>
> every line of the file? From the information you have sent I cannot
> fully understand the "schema" of your data.
>
> Regards,
>
> Apostolos
>
>
> On 25/5/22 23:06, Sid wrote:
> > Hi Experts,
> >
> > I have below CSV data that is getting generated automatically. I can't
> > change the data manually.
> >
> > The data looks like below:
> >
> > 2020-12-12,abc,2000,,INR,
> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
> > 2020-12-09,fgh,,software_developer,I only manage the development part.
> >
> > Since I don't have much experience with the other domains.
> >
> > It is handled by the other people.,INR
> > 2020-12-12,abc,2000,,USD,
> >
> > The third record is a problem. Since the value is separated by the new
> > line by the user while filling up the form. So, how do I handle this?
> >
> > There are 6 columns and 4 records in total. These are the sample records.
> >
> > Should I load it as RDD and then may be using a regex should eliminate
> > the new lines? Or how it should be? with ". /n" ?
> >
> > Any suggestions?
> >
> > Thanks,
> > Sid
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Complexity with the data

2022-05-25 Thread Sid
Hi Experts,

I have below CSV data that is getting generated automatically. I can't
change the data manually.

The data looks like below:

2020-12-12,abc,2000,,INR,
2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
2020-12-09,fgh,,software_developer,I only manage the development part.

Since I don't have much experience with the other domains.

It is handled by the other people.,INR
2020-12-12,abc,2000,,USD,

The third record is a problem. Since the value is separated by the new line
by the user while filling up the form. So, how do I handle this?

There are 6 columns and 4 records in total. These are the sample records.

Should I load it as RDD and then may be using a regex should eliminate the
new lines? Or how it should be? with ". /n" ?

Any suggestions?

Thanks,
Sid


Count() action leading to errors | Pyspark

2022-05-06 Thread Sid
Hi Team,

I am trying to display the counts of the DF which is created by running one
Spark SQL query with a CTE pattern.

Everything is working as expected. I was able to write the DF to Postgres
RDS. However, when I am trying to display the counts using a simple count()
action it leads to the below error:

py4j.protocol.Py4JJavaError: An error occurred while calling o321.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
1301 in stage 35.0 failed 4 times, most recent failure: Lost task 1301.3 in
stage 35.0 (TID 7889, 10.100.6.148, executor 1):
java.io.FileNotFoundException: File not present on S3
It is possible the underlying files have been updated. You can explicitly
invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
in SQL or by recreating the Dataset/DataFrame involved.


So, I tried something like the below:

print(modifiedData.repartition(modifiedData.rdd.getNumPartitions()).count())

So, there are 80 partitions being formed for this DF, and the count written
in Table is 92,665. However, it didn't match with the count displayed post
repartitioning which was 91,183

Not sure why is this gap?

Why the counts are not matching? Also what could be the possible reason for
that simple count error?

Environment:
AWS GLUE 1.X
10 workers
Spark 2.4.3

Thanks,
Sid


Re: Dealing with large number of small files

2022-04-27 Thread Sid
Yes,


It created a list of records separated by , and it was created faster as
well.

On Wed, 27 Apr 2022, 13:42 Gourav Sengupta, 
wrote:

> Hi,
> did that result in valid JSON in the output file?
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 26, 2022 at 8:18 PM Sid  wrote:
>
>> I have .txt files with JSON inside it. It is generated by some API calls
>> by the Client.
>>
>> On Wed, Apr 27, 2022 at 12:39 AM Bjørn Jørgensen <
>> bjornjorgen...@gmail.com> wrote:
>>
>>> What is that you have? Is it txt files or json files?
>>> Or do you have txt files with JSON inside?
>>>
>>>
>>>
>>> tir. 26. apr. 2022 kl. 20:41 skrev Sid :
>>>
>>>> Thanks for your time, everyone :)
>>>>
>>>> Much appreciated.
>>>>
>>>> I solved it using jq utility since I was dealing with JSON. I have
>>>> solved it using below script:
>>>>
>>>> find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Sid
>>>>
>>>>
>>>> On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen <
>>>> bjornjorgen...@gmail.com> wrote:
>>>>
>>>>> and the bash script seems to read txt files not json
>>>>>
>>>>> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>>>>>
>>>>>
>>>>>
>>>>> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> what is the version of spark are you using? And where is the data
>>>>>> stored.
>>>>>>
>>>>>> I am not quite sure that just using a bash script will help because
>>>>>> concatenating all the files into a single file creates a valid JSON.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav
>>>>>>
>>>>>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Can somebody help me with the below problem?
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sid
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Bjørn Jørgensen
>>>>> Vestre Aspehaug 4, 6010 Ålesund
>>>>> Norge
>>>>>
>>>>> +47 480 94 297
>>>>>
>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>


Re: Dealing with large number of small files

2022-04-26 Thread Sid
I have .txt files with JSON inside it. It is generated by some API calls by
the Client.

On Wed, Apr 27, 2022 at 12:39 AM Bjørn Jørgensen 
wrote:

> What is that you have? Is it txt files or json files?
> Or do you have txt files with JSON inside?
>
>
>
> tir. 26. apr. 2022 kl. 20:41 skrev Sid :
>
>> Thanks for your time, everyone :)
>>
>> Much appreciated.
>>
>> I solved it using jq utility since I was dealing with JSON. I have solved
>> it using below script:
>>
>> find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt
>>
>>
>> Thanks,
>>
>> Sid
>>
>>
>> On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen 
>> wrote:
>>
>>> and the bash script seems to read txt files not json
>>>
>>> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>>>
>>>
>>>
>>> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
>>> gourav.sengu...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> what is the version of spark are you using? And where is the data
>>>> stored.
>>>>
>>>> I am not quite sure that just using a bash script will help because
>>>> concatenating all the files into a single file creates a valid JSON.
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Can somebody help me with the below problem?
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Sid
>>>>>
>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: Dealing with large number of small files

2022-04-26 Thread Sid
Thanks for your time, everyone :)

Much appreciated.

I solved it using jq utility since I was dealing with JSON. I have solved
it using below script:

find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt


Thanks,

Sid


On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen 
wrote:

> and the bash script seems to read txt files not json
>
> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>
>
>
> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
> gourav.sengu...@gmail.com>:
>
>> Hi,
>>
>> what is the version of spark are you using? And where is the data stored.
>>
>> I am not quite sure that just using a bash script will help because
>> concatenating all the files into a single file creates a valid JSON.
>>
>> Regards,
>> Gourav
>>
>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>
>>> Hello,
>>>
>>> Can somebody help me with the below problem?
>>>
>>>
>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>
>>>
>>> Thanks,
>>> Sid
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Dealing with large number of small files

2022-04-26 Thread Sid
Hello,

Can somebody help me with the below problem?

https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark


Thanks,
Sid


When should we cache / persist ? After or Before Actions?

2022-04-21 Thread Sid
Hi Folks,

I am working on Spark Dataframe API where I am doing following thing:

1) df = spark.sql("some sql on huge dataset").persist()
2) df1 = df.count()
3) df.repartition().write.mode().parquet("")


AFAIK, persist should be used after count statement if at all it is needed
to be used since spark is lazily evaluated and if I call any action it will
recompute the above code and hence no use of persisting it before action.

Therefore, it should be something like the below that should give better
performance.
1) df= spark.sql("some sql on huge dataset")
2) df1 = df.count()
3) df.persist()
4) df.repartition().write.mode().parquet("")

So please help me to understand how it should be exactly and why? If I am
not correct

Thanks,
Sid


Cannot compare columns directly in IF...ELSE statement

2022-03-25 Thread Sid
Hi Team,

I need help with the below problem:

https://stackoverflow.com/questions/71613292/how-to-use-columns-in-if-else-condition-in-pyspark

Thanks,
Sid


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Basically, I am trying two different approaches for the same problem and my
concern is how it will behave in the case of big data if you talk about
millions of records. Which one would be faster? Is using windowing
functions a better way since it will load the entire dataset into a single
window and do the operations?

On Mon, Feb 28, 2022 at 12:26 AM Sean Owen  wrote:

> Those queries look like they do fairly different things. One is selecting
> top employees by salary, the other is ... selecting where there are less
> than 3 distinct salaries or something.
> Not sure what the intended comparison is then; these are not equivalent
> ways of doing the same thing, or does not seem so as far as I can see.
>
> On Sun, Feb 27, 2022 at 12:30 PM Sid  wrote:
>
>> My bad.
>>
>> Aggregation Query:
>>
>> # Write your MySQL query statement below
>>
>>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
>> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
>> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
>> ORDER by E.DepartmentId, E.Salary DESC
>>
>> Time Taken: 1212 ms
>>
>> Windowing Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time Taken: 790 ms
>>
>> Thanks,
>> Sid
>>
>>
>> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>>
>>> Those two queries are identical?
>>>
>>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am aware that if windowing functions are used, then at first it loads
>>>> the entire dataset into one window,scans and then performs the other
>>>> mentioned operations for that particular window which could be slower when
>>>> dealing with trillions / billions of records.
>>>>
>>>> I did a POC where I used an example to find the max 3 highest salary
>>>> for an employee per department. So, I wrote a below queries and compared
>>>> the time for it:
>>>>
>>>> Windowing Query:
>>>>
>>>> select Department,Employee,Salary from (
>>>> select d.name as Department, e.name as Employee,e.salary as
>>>> Salary,dense_rank() over(partition by d.name order by e.salary desc)
>>>> as rnk from Department d join Employee e on e.departmentId=d.id ) a
>>>> where rnk<=3
>>>>
>>>> Time taken: 790 ms
>>>>
>>>> Aggregation Query:
>>>>
>>>> select Department,Employee,Salary from (
>>>> select d.name as Department, e.name as Employee,e.salary as
>>>> Salary,dense_rank() over(partition by d.name order by e.salary desc)
>>>> as rnk from Department d join Employee e on e.departmentId=d.id ) a
>>>> where rnk<=3
>>>>
>>>> Time taken: 1212 ms
>>>>
>>>> But as per my understanding, the aggregation should have run faster.
>>>> So, my whole point is if the dataset is huge I should force some kind of
>>>> map reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>>
>>>> So I think the aggregation query is taking more time since the dataset
>>>> size here is smaller and as we all know that map reduce works faster when
>>>> there is a huge volume of data. Haven't tested it yet on big data but
>>>> needed some expert guidance over here.
>>>>
>>>> Please correct me if I am wrong.
>>>>
>>>> TIA,
>>>> Sid
>>>>
>>>>
>>>>
>>>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Hi Enrico,

Thanks for your time :)

Consider a huge data volume scenario, If I don't use any keywords like
distinct, which one would be faster ? Window with partitionBy or normal SQL
aggregation methods? and how does df.groupBy().reduceByGroups() work
internally ?

Thanks,
Sid

On Mon, Feb 28, 2022 at 12:59 AM Enrico Minack 
wrote:

> Sid,
>
> Your Aggregation Query selects all employees where less than three
> distinct salaries exist that are larger. So, both queries seem to do the
> same.
>
> The Windowing Query is explicit in what it does: give me the rank for
> salaries per department in the given order and pick the top 3 per
> department.
>
> The Aggregation Query is trying to get to this conclusion by constructing
> some comparison. The former is the better approach, the second scales badly
> as this is done by counting distinct salaries that are larger than each
> salary in E. This looks like a Cartesian product of Employees. You make
> this very hard to optimize or execute by the query engine.
>
> And as you say, your example is very small, so this will not give any
> insights into big data.
>
> Enrico
>
>
> Am 27.02.22 um 19:30 schrieb Sid:
>
> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
My bad.

Aggregation Query:

# Write your MySQL query statement below

   SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
   WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
ORDER by E.DepartmentId, E.Salary DESC

Time Taken: 1212 ms

Windowing Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time Taken: 790 ms

Thanks,
Sid


On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:

> Those two queries are identical?
>
> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>
>> Hi Team,
>>
>> I am aware that if windowing functions are used, then at first it loads
>> the entire dataset into one window,scans and then performs the other
>> mentioned operations for that particular window which could be slower when
>> dealing with trillions / billions of records.
>>
>> I did a POC where I used an example to find the max 3 highest salary for
>> an employee per department. So, I wrote a below queries and compared the
>> time for it:
>>
>> Windowing Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time taken: 790 ms
>>
>> Aggregation Query:
>>
>> select Department,Employee,Salary from (
>> select d.name as Department, e.name as Employee,e.salary as
>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>> rnk<=3
>>
>> Time taken: 1212 ms
>>
>> But as per my understanding, the aggregation should have run faster. So,
>> my whole point is if the dataset is huge I should force some kind of map
>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>
>> So I think the aggregation query is taking more time since the dataset
>> size here is smaller and as we all know that map reduce works faster when
>> there is a huge volume of data. Haven't tested it yet on big data but
>> needed some expert guidance over here.
>>
>> Please correct me if I am wrong.
>>
>> TIA,
>> Sid
>>
>>
>>
>>


Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sid
Hi Team,

I am aware that if windowing functions are used, then at first it loads the
entire dataset into one window,scans and then performs the other mentioned
operations for that particular window which could be slower when dealing
with trillions / billions of records.

I did a POC where I used an example to find the max 3 highest salary for an
employee per department. So, I wrote a below queries and compared the time
for it:

Windowing Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time taken: 790 ms

Aggregation Query:

select Department,Employee,Salary from (
select d.name as Department, e.name as Employee,e.salary as
Salary,dense_rank() over(partition by d.name order by e.salary desc) as rnk
from Department d join Employee e on e.departmentId=d.id ) a where rnk<=3

Time taken: 1212 ms

But as per my understanding, the aggregation should have run faster. So, my
whole point is if the dataset is huge I should force some kind of map
reduce jobs like we have an option called df.groupby().reduceByGroups()

So I think the aggregation query is taking more time since the dataset size
here is smaller and as we all know that map reduce works faster when there
is a huge volume of data. Haven't tested it yet on big data but needed some
expert guidance over here.

Please correct me if I am wrong.

TIA,
Sid


Re: Spark Explain Plan and Joins

2022-02-23 Thread Sid
I actually went through the sort-merge algorithm and found out that it
compares the two values and actually resets the respective pointer to the
last matched pointer and then goes on comparing and fetches the records.

 Could you please go through

https://www.hadoopinrealworld.com/how-does-shuffle-sort-merge-join-work-in-spark/

Where the user has mentioned that it doesn't scans the entire table since
the keys are sorted.

Can you help me to understand this in my case ? Or if there is any
understanding gap on my end?

Also, regarding the other case where the joining condition is not
sorted out. Like for string columns. In this case, are the string column
sorted ? My question might be duplicate but I want to understand what
happens in case there is a non-numeric data as the joining keys.

Thanks,
Sid


On Thu, 24 Feb 2022, 02:45 Mich Talebzadeh, 
wrote:

> Yes correct because sort-merge can only work for equijoins. The point
> being that join columns are sortable in each DF.
>
>
> In a sort-merge join, the optimizer sorts the first DF by its join
> columns, sorts the second DF by its join columns, and then merges the
> intermediate result sets together. As matches are found, they are put into
> the final result set. Think of the following two tables as two dataframes
> on top of say Hive tables or Oracle tables etc
>
>
> [image: 0467_001]
>
>
> So sort-merge joins can be effective when lack of data selectivity renders 
> broadcast
> Nested Loop Join
> <https://www.hadoopinrealworld.com/how-to-avoid-a-broadcast-nested-loop-join-in-spark/>
> inefficient, or when both of the row sources are quite large as I believe
> in your example.
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 23 Feb 2022 at 20:30, Sid  wrote:
>
>>  From what I understood, you are asking whether sort-merge can be used in
>> either of the conditions? If my understanding is correct then yes because
>> it supports equi joins. Please correct me if I'm wrong.
>>
>> On Thu, Feb 24, 2022 at 1:49 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK let me put this question to you if I may
>>>
>>> What is the essence for sort-merge assuming we have a SARG WHERE D.deptno
>>> = E.deptno? Can we have a sort-merge for  WHERE D.deptno >= E.deptno!
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 23 Feb 2022 at 20:07, Sid  wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Thanks for the link. I will go through it. I have two doubts regarding
>>>> sort-merge join.
>>>>
>>>> 1) I came across one article where it mentioned that it is a better
>>>> join technique since it doesn't have to scan the entire tables since the
>>>> keys are sorted. If I have keys like 1,2,4,10 and other lists as
>>>> 1,2,3,4,5,6,7,8,9,10. In this case I will get data for keys 1,2,4,10 as the
>>>> output if I talk about the inner join. So, how does it work exactly in my
>>>> case? Assume these datasets as a huge dataset.
>>>>
>>>> 2) If I don't have sortable keys but still I have a huge dataset and
>>>> need to join then in this case what can I do? Suppose I have a "Department"
>>>> column and need to join with the other table based on "Department". So, can
>>>> I sort the string as well? What does it exactly mean by non-sortable keys?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Wed, Feb 23, 2022 at 

Re: Spark Explain Plan and Joins

2022-02-23 Thread Sid
 From what I understood, you are asking whether sort-merge can be used in
either of the conditions? If my understanding is correct then yes because
it supports equi joins. Please correct me if I'm wrong.

On Thu, Feb 24, 2022 at 1:49 AM Mich Talebzadeh 
wrote:

> OK let me put this question to you if I may
>
> What is the essence for sort-merge assuming we have a SARG WHERE D.deptno
> = E.deptno? Can we have a sort-merge for  WHERE D.deptno >= E.deptno!
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 23 Feb 2022 at 20:07, Sid  wrote:
>
>> Hi Mich,
>>
>> Thanks for the link. I will go through it. I have two doubts regarding
>> sort-merge join.
>>
>> 1) I came across one article where it mentioned that it is a better join
>> technique since it doesn't have to scan the entire tables since the keys
>> are sorted. If I have keys like 1,2,4,10 and other lists as
>> 1,2,3,4,5,6,7,8,9,10. In this case I will get data for keys 1,2,4,10 as the
>> output if I talk about the inner join. So, how does it work exactly in my
>> case? Assume these datasets as a huge dataset.
>>
>> 2) If I don't have sortable keys but still I have a huge dataset and need
>> to join then in this case what can I do? Suppose I have a "Department"
>> column and need to join with the other table based on "Department". So, can
>> I sort the string as well? What does it exactly mean by non-sortable keys?
>>
>> Thanks,
>> Sid
>>
>> On Wed, Feb 23, 2022 at 11:46 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Sid,
>>>
>>> For now, with regard to point 2
>>>
>>> 2) Predicate push down under the optimized logical plan. Could you
>>> please help me to understand the predicate pushdown with some other simple
>>> example?
>>>
>>>
>>> Please see this good explanation with examples
>>>
>>>
>>> Using Spark predicate push down in Spark SQL queries
>>> <https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 23 Feb 2022 at 17:57, Sid  wrote:
>>>
>>>> Hi,
>>>>
>>>> Can you help me with my doubts? Any links would also be helpful.
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Wed, Feb 23, 2022 at 1:22 AM Sid Kal  wrote:
>>>>
>>>>> Hi Mich / Gourav,
>>>>>
>>>>> Thanks for your time :) Much appreciated. I went through the article
>>>>> shared by Mich about the query execution plan. I pretty much understood
>>>>> most of the things till now except the two things below.
>>>>> 1) HashAggregate in the plan? Does this always indicate "group by"
>>>>> columns?
>>>>> 2) Predicate push down under the optimized logical plan. Could you
>>>>> please help me to understand the predicate pushdown with some other simple
>>>>> example?
>>>>>
>>>>>
>>>>> On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think that the best option is to use the SPARK UI. In SPARK 3.x the
>>>>>> UI and its additional settings are fantasti

Re: Spark Explain Plan and Joins

2022-02-23 Thread Sid
Hi Mich,

Thanks for the link. I will go through it. I have two doubts regarding
sort-merge join.

1) I came across one article where it mentioned that it is a better join
technique since it doesn't have to scan the entire tables since the keys
are sorted. If I have keys like 1,2,4,10 and other lists as
1,2,3,4,5,6,7,8,9,10. In this case I will get data for keys 1,2,4,10 as the
output if I talk about the inner join. So, how does it work exactly in my
case? Assume these datasets as a huge dataset.

2) If I don't have sortable keys but still I have a huge dataset and need
to join then in this case what can I do? Suppose I have a "Department"
column and need to join with the other table based on "Department". So, can
I sort the string as well? What does it exactly mean by non-sortable keys?

Thanks,
Sid

On Wed, Feb 23, 2022 at 11:46 PM Mich Talebzadeh 
wrote:

> Hi Sid,
>
> For now, with regard to point 2
>
> 2) Predicate push down under the optimized logical plan. Could you please
> help me to understand the predicate pushdown with some other simple example?
>
>
> Please see this good explanation with examples
>
>
> Using Spark predicate push down in Spark SQL queries
> <https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 23 Feb 2022 at 17:57, Sid  wrote:
>
>> Hi,
>>
>> Can you help me with my doubts? Any links would also be helpful.
>>
>> Thanks,
>> Sid
>>
>> On Wed, Feb 23, 2022 at 1:22 AM Sid Kal  wrote:
>>
>>> Hi Mich / Gourav,
>>>
>>> Thanks for your time :) Much appreciated. I went through the article
>>> shared by Mich about the query execution plan. I pretty much understood
>>> most of the things till now except the two things below.
>>> 1) HashAggregate in the plan? Does this always indicate "group by"
>>> columns?
>>> 2) Predicate push down under the optimized logical plan. Could you
>>> please help me to understand the predicate pushdown with some other simple
>>> example?
>>>
>>>
>>> On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think that the best option is to use the SPARK UI. In SPARK 3.x the
>>>> UI and its additional settings are fantastic. Try to also see the settings
>>>> for Adaptive Query Execution in SPARK, under certain conditions it really
>>>> works wonders.
>>>>
>>>> For certain long queries, the way you are finally triggering the action
>>>> of query execution, and whether you are using SPARK Dataframes or SPARK
>>>> SQL, and the settings in SPARK (look at the settings for SPARK 3.x) and a
>>>> few other aspects you will see that the plan is quite cryptic and difficult
>>>> to read sometimes.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Sun, Feb 20, 2022 at 7:32 PM Sid Kal  wrote:
>>>>
>>>>> Hi Gourav,
>>>>>
>>>>> Right now I am just trying to understand the query execution plan by
>>>>> executing a simple join example via Spark SQL. The overall goal is to
>>>>> understand these plans so that going forward if my query runs slow due to
>>>>> data skewness or some other issues, I should be able to atleast understand
>>>>> what exactly is happening at the master and slave sides like map reduce.
>>>>>
>>>>> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> what are you trying to achieve by this?
>>>>>>
>>>>>> If there is a performance deterioration, try to collect the query
>>>>>> execution run time statistics from SPARK SQL. They can be seen from the
>>>>>> SPARK SQL UI and available over API's in case I am not wrong.
&g

Re: Unable to display JSON records with null values

2022-02-23 Thread Sid
Okay. So what should I do if I get such data?

On Wed, Feb 23, 2022 at 11:59 PM Sean Owen  wrote:

> There is no record "345" here it seems, right? it's not that it exists and
> has null fields; it's invalid w.r.t. the schema that the rest suggests.
>
> On Wed, Feb 23, 2022 at 11:57 AM Sid  wrote:
>
>> Hello experts,
>>
>> I have a JSON data like below:
>>
>> [
>>   {
>> "123": {
>>   "Party1": {
>> "FIRSTNAMEBEN": "ABC",
>> "ALIASBEN": "",
>> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
>> "DATEOFBIRTH": "7/Oct/1969"
>>   },
>>   "Party2": {
>> "FIRSTNAMEBEN": "ABCC",
>> "ALIASBEN": "",
>> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
>> "DATEOFBIRTH": "7/Oct/1969"
>>   }
>> },
>> "GeneratedTime": "2022-01-30 03:09:26"
>>   },
>>   {
>> "456": {
>>   "Party1": {
>> "FIRSTNAMEBEN": "ABCD",
>> "ALIASBEN": "",
>> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
>> "DATEOFBIRTH": "7/Oct/1969"
>>   },
>>   "Party2": {
>> "FIRSTNAMEBEN": "ABCDD",
>> "ALIASBEN": "",
>> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
>> "DATEOFBIRTH": "7/Oct/1969"
>>   },
>>   "Party3": {
>> "FIRSTNAMEBEN": "ABCDDE",
>> "ALIASBEN": "",
>> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
>> "DATEOFBIRTH": "7/Oct/1969"
>>   }
>> },
>> "GeneratedTime": "2022-01-30 03:09:26"
>>   },
>>   {
>> "345": {
>>
>>
>> },
>> "GeneratedTime": "2022-01-30 03:09:26"
>>   }
>> ]
>>
>> However, when I try to display this JSON using below code, it doesn't
>> show the blank records. In my case I don't get any records for 345 since it
>> is null but I want to display it in the final flattened dataset.
>>
>> val df = spark.read.option("multiline",
>> true).json("/home/siddhesh/Documents/nested_json.json")
>>
>> Spark version:3.1.1
>>
>> Thanks,
>> Sid
>>
>


Re: Spark Explain Plan and Joins

2022-02-23 Thread Sid
Hi,

Can you help me with my doubts? Any links would also be helpful.

Thanks,
Sid

On Wed, Feb 23, 2022 at 1:22 AM Sid Kal  wrote:

> Hi Mich / Gourav,
>
> Thanks for your time :) Much appreciated. I went through the article
> shared by Mich about the query execution plan. I pretty much understood
> most of the things till now except the two things below.
> 1) HashAggregate in the plan? Does this always indicate "group by" columns?
> 2) Predicate push down under the optimized logical plan. Could you please
> help me to understand the predicate pushdown with some other simple example?
>
>
> On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> I think that the best option is to use the SPARK UI. In SPARK 3.x the UI
>> and its additional settings are fantastic. Try to also see the settings for
>> Adaptive Query Execution in SPARK, under certain conditions it really works
>> wonders.
>>
>> For certain long queries, the way you are finally triggering the action
>> of query execution, and whether you are using SPARK Dataframes or SPARK
>> SQL, and the settings in SPARK (look at the settings for SPARK 3.x) and a
>> few other aspects you will see that the plan is quite cryptic and difficult
>> to read sometimes.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Feb 20, 2022 at 7:32 PM Sid Kal  wrote:
>>
>>> Hi Gourav,
>>>
>>> Right now I am just trying to understand the query execution plan by
>>> executing a simple join example via Spark SQL. The overall goal is to
>>> understand these plans so that going forward if my query runs slow due to
>>> data skewness or some other issues, I should be able to atleast understand
>>> what exactly is happening at the master and slave sides like map reduce.
>>>
>>> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> what are you trying to achieve by this?
>>>>
>>>> If there is a performance deterioration, try to collect the query
>>>> execution run time statistics from SPARK SQL. They can be seen from the
>>>> SPARK SQL UI and available over API's in case I am not wrong.
>>>>
>>>> Please ensure that you are not trying to over automate things.
>>>>
>>>> Reading how to understand the plans may be good depending on what you
>>>> are trying to do.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal 
>>>> wrote:
>>>>
>>>>> I wrote a query like below and I am trying to understand its query
>>>>> execution plan.
>>>>>
>>>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a
>>>>> join df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>>>>> == Parsed Logical Plan ==
>>>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>>>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>>>>:- 'SubqueryAlias a
>>>>>:  +- 'UnresolvedRelation [df], [], false
>>>>>+- 'SubqueryAlias b
>>>>>   +- 'UnresolvedRelation [df1], [], false
>>>>>
>>>>> == Analyzed Logical Plan ==
>>>>> CustomerID: int, CustomerName: string, state: string
>>>>> Project [CustomerID#640, CustomerName#641, state#988]
>>>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>>>:- SubqueryAlias a
>>>>>:  +- SubqueryAlias df
>>>>>: +-
>>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>>>> csv
>>>>>+- SubqueryAlias b
>>>>>   +- SubqueryAlias df1
>>>>>  +-
>>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>>>> csv
>>>>>

Unable to display JSON records with null values

2022-02-23 Thread Sid
Hello experts,

I have a JSON data like below:

[
  {
"123": {
  "Party1": {
"FIRSTNAMEBEN": "ABC",
"ALIASBEN": "",
"RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
"DATEOFBIRTH": "7/Oct/1969"
  },
  "Party2": {
"FIRSTNAMEBEN": "ABCC",
"ALIASBEN": "",
"RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
"DATEOFBIRTH": "7/Oct/1969"
  }
},
"GeneratedTime": "2022-01-30 03:09:26"
  },
  {
"456": {
  "Party1": {
"FIRSTNAMEBEN": "ABCD",
"ALIASBEN": "",
"RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
"DATEOFBIRTH": "7/Oct/1969"
  },
  "Party2": {
"FIRSTNAMEBEN": "ABCDD",
"ALIASBEN": "",
"RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
"DATEOFBIRTH": "7/Oct/1969"
  },
  "Party3": {
"FIRSTNAMEBEN": "ABCDDE",
"ALIASBEN": "",
"RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
"DATEOFBIRTH": "7/Oct/1969"
  }
},
"GeneratedTime": "2022-01-30 03:09:26"
  },
  {
"345": {


},
"GeneratedTime": "2022-01-30 03:09:26"
  }
]

However, when I try to display this JSON using below code, it doesn't show
the blank records. In my case I don't get any records for 345 since it is
null but I want to display it in the final flattened dataset.

val df = spark.read.option("multiline",
true).json("/home/siddhesh/Documents/nested_json.json")

Spark version:3.1.1

Thanks,
Sid


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sid
Cool. Here, the problem is I have to run the Spark jobs on Glue ETL which
supports 2.4.3 of Spark and I don't think so this distributed support was
added for pandas in that version. AFMKIC, it has been added in 3.2 version.

So how can I do it in spark 2.4.3? Correct me if I'm wrong.


On Wed, Feb 23, 2022 at 8:28 PM Bjørn Jørgensen 
wrote:

> You will. Pandas API on spark that `imported with from pyspark import
> pandas as ps` is not pandas but an API that is using pyspark under.
>
> ons. 23. feb. 2022 kl. 15:54 skrev Sid :
>
>> Hi Bjørn,
>>
>> Thanks for your reply. This doesn't help while loading huge datasets.
>> Won't be able to achieve spark functionality while loading the file in
>> distributed manner.
>>
>> Thanks,
>> Sid
>>
>> On Wed, Feb 23, 2022 at 7:38 PM Bjørn Jørgensen 
>> wrote:
>>
>>> from pyspark import pandas as ps
>>>
>>>
>>> ps.read_excel?
>>> "Support both `xls` and `xlsx` file extensions from a local filesystem
>>> or URL"
>>>
>>> pdf = ps.read_excel("file")
>>>
>>> df = pdf.to_spark()
>>>
>>> ons. 23. feb. 2022 kl. 14:57 skrev Sid :
>>>
>>>> Hi Gourav,
>>>>
>>>> Thanks for your time.
>>>>
>>>> I am worried about the distribution of data in case of a huge dataset
>>>> file. Is Koalas still a better option to go ahead with? If yes, how can I
>>>> use it with Glue ETL jobs? Do I have to pass some kind of external jars for
>>>> it?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Wed, Feb 23, 2022 at 7:22 PM Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> this looks like a very specific and exact problem in its scope.
>>>>>
>>>>> Do you think that you can load the data into panda dataframe and load
>>>>> it back to SPARK using PANDAS UDF?
>>>>>
>>>>> Koalas is now natively integrated with SPARK, try to see if you can
>>>>> use those features.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>> On Wed, Feb 23, 2022 at 1:31 PM Sid  wrote:
>>>>>
>>>>>> I have an excel file which unfortunately cannot be converted to CSV
>>>>>> format and I am trying to load it using pyspark shell.
>>>>>>
>>>>>> I tried invoking the below pyspark session with the jars provided.
>>>>>>
>>>>>> pyspark --jars
>>>>>> /home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar
>>>>>>
>>>>>> and below is the code to read the excel file:
>>>>>>
>>>>>> df = spark.read.format("excel") \
>>>>>>  .option("dataAddress", "'Sheet1'!") \
>>>>>>  .option("header", "true") \
>>>>>>  .option("inferSchema", "true") \
>>>>>> .load("/home/.../Documents/test_excel.xlsx")
>>>>>>
>>>>>> It is giving me the below error message:
>>>>>>
>>>>>>  java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>>>>>>
>>>>>> I tried several Jars for this error but no luck. Also, what would be
>>>>>> the efficient way to load it?
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sid
Hi Bjørn,

Thanks for your reply. This doesn't help while loading huge datasets. Won't
be able to achieve spark functionality while loading the file in
distributed manner.

Thanks,
Sid

On Wed, Feb 23, 2022 at 7:38 PM Bjørn Jørgensen 
wrote:

> from pyspark import pandas as ps
>
>
> ps.read_excel?
> "Support both `xls` and `xlsx` file extensions from a local filesystem or
> URL"
>
> pdf = ps.read_excel("file")
>
> df = pdf.to_spark()
>
> ons. 23. feb. 2022 kl. 14:57 skrev Sid :
>
>> Hi Gourav,
>>
>> Thanks for your time.
>>
>> I am worried about the distribution of data in case of a huge dataset
>> file. Is Koalas still a better option to go ahead with? If yes, how can I
>> use it with Glue ETL jobs? Do I have to pass some kind of external jars for
>> it?
>>
>> Thanks,
>> Sid
>>
>> On Wed, Feb 23, 2022 at 7:22 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> this looks like a very specific and exact problem in its scope.
>>>
>>> Do you think that you can load the data into panda dataframe and load it
>>> back to SPARK using PANDAS UDF?
>>>
>>> Koalas is now natively integrated with SPARK, try to see if you can use
>>> those features.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Wed, Feb 23, 2022 at 1:31 PM Sid  wrote:
>>>
>>>> I have an excel file which unfortunately cannot be converted to CSV
>>>> format and I am trying to load it using pyspark shell.
>>>>
>>>> I tried invoking the below pyspark session with the jars provided.
>>>>
>>>> pyspark --jars
>>>> /home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar
>>>>
>>>> and below is the code to read the excel file:
>>>>
>>>> df = spark.read.format("excel") \
>>>>  .option("dataAddress", "'Sheet1'!") \
>>>>  .option("header", "true") \
>>>>  .option("inferSchema", "true") \
>>>> .load("/home/.../Documents/test_excel.xlsx")
>>>>
>>>> It is giving me the below error message:
>>>>
>>>>  java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>>>>
>>>> I tried several Jars for this error but no luck. Also, what would be
>>>> the efficient way to load it?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sid
Hi Gourav,

Thanks for your time.

I am worried about the distribution of data in case of a huge dataset file.
Is Koalas still a better option to go ahead with? If yes, how can I use it
with Glue ETL jobs? Do I have to pass some kind of external jars for it?

Thanks,
Sid

On Wed, Feb 23, 2022 at 7:22 PM Gourav Sengupta 
wrote:

> Hi,
>
> this looks like a very specific and exact problem in its scope.
>
> Do you think that you can load the data into panda dataframe and load it
> back to SPARK using PANDAS UDF?
>
> Koalas is now natively integrated with SPARK, try to see if you can use
> those features.
>
>
> Regards,
> Gourav
>
> On Wed, Feb 23, 2022 at 1:31 PM Sid  wrote:
>
>> I have an excel file which unfortunately cannot be converted to CSV
>> format and I am trying to load it using pyspark shell.
>>
>> I tried invoking the below pyspark session with the jars provided.
>>
>> pyspark --jars
>> /home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar
>>
>> and below is the code to read the excel file:
>>
>> df = spark.read.format("excel") \
>>  .option("dataAddress", "'Sheet1'!") \
>>  .option("header", "true") \
>>  .option("inferSchema", "true") \
>> .load("/home/.../Documents/test_excel.xlsx")
>>
>> It is giving me the below error message:
>>
>>  java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>>
>> I tried several Jars for this error but no luck. Also, what would be the
>> efficient way to load it?
>>
>> Thanks,
>> Sid
>>
>


Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sid
I have an excel file which unfortunately cannot be converted to CSV format
and I am trying to load it using pyspark shell.

I tried invoking the below pyspark session with the jars provided.

pyspark --jars
/home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar

and below is the code to read the excel file:

df = spark.read.format("excel") \
 .option("dataAddress", "'Sheet1'!") \
 .option("header", "true") \
 .option("inferSchema", "true") \
.load("/home/.../Documents/test_excel.xlsx")

It is giving me the below error message:

 java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager

I tried several Jars for this error but no luck. Also, what would be the
efficient way to load it?

Thanks,
Sid


Re: Spark Explain Plan and Joins

2022-02-22 Thread Sid Kal
Hi Mich / Gourav,

Thanks for your time :) Much appreciated. I went through the article shared
by Mich about the query execution plan. I pretty much understood most of
the things till now except the two things below.
1) HashAggregate in the plan? Does this always indicate "group by" columns?
2) Predicate push down under the optimized logical plan. Could you please
help me to understand the predicate pushdown with some other simple example?


On Mon, Feb 21, 2022 at 1:52 PM Gourav Sengupta 
wrote:

> Hi,
>
> I think that the best option is to use the SPARK UI. In SPARK 3.x the UI
> and its additional settings are fantastic. Try to also see the settings for
> Adaptive Query Execution in SPARK, under certain conditions it really works
> wonders.
>
> For certain long queries, the way you are finally triggering the action of
> query execution, and whether you are using SPARK Dataframes or SPARK SQL,
> and the settings in SPARK (look at the settings for SPARK 3.x) and a few
> other aspects you will see that the plan is quite cryptic and difficult to
> read sometimes.
>
> Regards,
> Gourav Sengupta
>
> On Sun, Feb 20, 2022 at 7:32 PM Sid Kal  wrote:
>
>> Hi Gourav,
>>
>> Right now I am just trying to understand the query execution plan by
>> executing a simple join example via Spark SQL. The overall goal is to
>> understand these plans so that going forward if my query runs slow due to
>> data skewness or some other issues, I should be able to atleast understand
>> what exactly is happening at the master and slave sides like map reduce.
>>
>> On Sun, Feb 20, 2022 at 9:06 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> what are you trying to achieve by this?
>>>
>>> If there is a performance deterioration, try to collect the query
>>> execution run time statistics from SPARK SQL. They can be seen from the
>>> SPARK SQL UI and available over API's in case I am not wrong.
>>>
>>> Please ensure that you are not trying to over automate things.
>>>
>>> Reading how to understand the plans may be good depending on what you
>>> are trying to do.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Sat, Feb 19, 2022 at 10:00 AM Sid Kal  wrote:
>>>
>>>> I wrote a query like below and I am trying to understand its query
>>>> execution plan.
>>>>
>>>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a
>>>> join df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>>>> == Parsed Logical Plan ==
>>>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>>>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>>>:- 'SubqueryAlias a
>>>>:  +- 'UnresolvedRelation [df], [], false
>>>>+- 'SubqueryAlias b
>>>>   +- 'UnresolvedRelation [df1], [], false
>>>>
>>>> == Analyzed Logical Plan ==
>>>> CustomerID: int, CustomerName: string, state: string
>>>> Project [CustomerID#640, CustomerName#641, state#988]
>>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>>:- SubqueryAlias a
>>>>:  +- SubqueryAlias df
>>>>: +-
>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>>>> csv
>>>>+- SubqueryAlias b
>>>>   +- SubqueryAlias df1
>>>>  +-
>>>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>>>> csv
>>>>
>>>> == Optimized Logical Plan ==
>>>> Project [CustomerID#640, CustomerName#641, state#988]
>>>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>>>:- Project [CustomerID#640, CustomerName#641]
>>>>:  +- Filter isnotnull(CustomerID#640)
>>>>: +-
>>>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMob

Re: Spark Explain Plan and Joins

2022-02-20 Thread Sid
Thank you so much for your reply, Mich. I will go through it. However, I
want to understand how to read this plan? If I face any errors or if I want
to look how spark is cost optimizing or how should we approach it?

Could you help me in layman terms?

Thanks,
Sid

On Sun, 20 Feb 2022, 17:50 Mich Talebzadeh, 
wrote:

> Do a Google search on *sort-merge spark*. There are plenty of notes on
> the topic and examples. NLJ, Sort-merge and Hash-joins and derivatives are
> common join algorithms in database systems. These were not created by
> Spark.  At a given time, there are reasons why one specific join is
> preferred over the other. Over time the underlying data volume may change
> and that could result in the optimizer opting for another type of join.
>
>
> For some relatively recent discussion on Spark Join Strategies have a look
> at here
> <https://blog.clairvoyantsoft.com/apache-spark-join-strategies-e4ebc7624b06>
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 19 Feb 2022 at 10:00, Sid Kal  wrote:
>
>> I wrote a query like below and I am trying to understand its query
>> execution plan.
>>
>> >>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
>> df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
>> == Parsed Logical Plan ==
>> 'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
>> +- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
>>:- 'SubqueryAlias a
>>:  +- 'UnresolvedRelation [df], [], false
>>+- 'SubqueryAlias b
>>   +- 'UnresolvedRelation [df1], [], false
>>
>> == Analyzed Logical Plan ==
>> CustomerID: int, CustomerName: string, state: string
>> Project [CustomerID#640, CustomerName#641, state#988]
>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>:- SubqueryAlias a
>>:  +- SubqueryAlias df
>>: +-
>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>> csv
>>+- SubqueryAlias b
>>   +- SubqueryAlias df1
>>  +-
>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>> csv
>>
>> == Optimized Logical Plan ==
>> Project [CustomerID#640, CustomerName#641, state#988]
>> +- Join Inner, (CustomerID#640 = CustomerID#978)
>>:- Project [CustomerID#640, CustomerName#641]
>>:  +- Filter isnotnull(CustomerID#640)
>>: +-
>> Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
>> csv
>>+- Project [CustomerID#978, State#988]
>>   +- Filter isnotnull(CustomerID#978)
>>  +-
>> Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
>> csv
>>
>> == Physical Plan ==
>> *(5) Project [CustomerID#640, CustomerName#641, state#988]
>> +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
>>:- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(CustomerID#640, 200),
>> ENSURE_REQUIREMENTS, [id=#451]
>>: +- *(1) Filter isnotnull(CustomerID#640)
>>:+- FileScan csv [CustomerID#640,CustomerName#641] Batched:
>> false, DataFilters: [isnotnull(CustomerID#640)], Format: C

Spark Explain Plan and Joins

2022-02-19 Thread Sid Kal
I wrote a query like below and I am trying to understand its query
execution plan.

>>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join
df1 b on a.CustomerID=b.CustomerID").explain(mode="extended")
== Parsed Logical Plan ==
'Project ['a.CustomerID, 'a.CustomerName, 'b.state]
+- 'Join Inner, ('a.CustomerID = 'b.CustomerID)
   :- 'SubqueryAlias a
   :  +- 'UnresolvedRelation [df], [], false
   +- 'SubqueryAlias b
  +- 'UnresolvedRelation [df1], [], false

== Analyzed Logical Plan ==
CustomerID: int, CustomerName: string, state: string
Project [CustomerID#640, CustomerName#641, state#988]
+- Join Inner, (CustomerID#640 = CustomerID#978)
   :- SubqueryAlias a
   :  +- SubqueryAlias df
   : +-
Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
csv
   +- SubqueryAlias b
  +- SubqueryAlias df1
 +-
Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
csv

== Optimized Logical Plan ==
Project [CustomerID#640, CustomerName#641, state#988]
+- Join Inner, (CustomerID#640 = CustomerID#978)
   :- Project [CustomerID#640, CustomerName#641]
   :  +- Filter isnotnull(CustomerID#640)
   : +-
Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655]
csv
   +- Project [CustomerID#978, State#988]
  +- Filter isnotnull(CustomerID#978)
 +-
Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993]
csv

== Physical Plan ==
*(5) Project [CustomerID#640, CustomerName#641, state#988]
+- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner
   :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(CustomerID#640, 200),
ENSURE_REQUIREMENTS, [id=#451]
   : +- *(1) Filter isnotnull(CustomerID#640)
   :+- FileScan csv [CustomerID#640,CustomerName#641] Batched:
false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location:
InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
struct
   +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(CustomerID#978, 200),
ENSURE_REQUIREMENTS, [id=#459]
 +- *(3) Filter isnotnull(CustomerID#978)
+- FileScan csv [CustomerID#978,State#988] Batched: false,
DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location:
InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no],
PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema:
struct

I know some of the features like Project is like select clause, filters is
whatever filters we use in the query. Where can I look for the cost
optimization in this plan? Suppose in future if my query is taking a longer
time to be executed then by looking at this plan how can I figure what
exactly is happening and what needs to be modified on the query part? Also
internally since spark by default uses sort merge join as I can see from
the plan but when does it opts for Sort-Merge Join and when does it opts
for Shuffle-Hash Join?

Thanks,
Sid


Re: How to delete the record

2022-01-27 Thread Sid Kal
Thanks Mich and Sean for your time

On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, 
wrote:

> Yes I believe so.
>
> Check this article of mine dated early 2019 but will have some relevance
> to what I am implying.
>
>
> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 18:46, Sid Kal  wrote:
>
>> Okay sounds good.
>>
>> So,  below two options would help me to capture CDC changes:
>>
>> 1) Delta lake
>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>
>> Correct me if I'm wrong
>>
>> Thanks,
>> Sid
>>
>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, 
>> wrote:
>>
>>> There are two ways of doing it.
>>>
>>>
>>>1. Through snapshot offered meaning an immutable snapshot of the
>>>state of the table at a given version. For example, the state
>>><https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>>>table
>>><https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>the version
>>><https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>2. Creating your own versioning. Taking your example you define the
>>>target storage *with two added columns, namely:* op_type INT
>>>(1-inset,2-update,3-delete) and op_timeTIMESTAMP .
>>>Your example record will be
>>>
>>>
>>> id   op_type  op_time
>>>
>>> 11 
>>>
>>> 13 
>>>
>>>
>>>df = rdd.toDF(). \
>>>
>>> withColumnRenamed("_1", "ID"). \
>>>
>>> withColumnRenamed("_2", "CLUSTERED"). \
>>>
>>> withColumnRenamed("_3", "SCATTERED"). \
>>>
>>> withColumnRenamed("_4", "RANDOMISED"). \
>>>
>>> withColumnRenamed("_5", "RANDOM_STRING"). \
>>>
>>> withColumnRenamed("_6", "SMALL_VC"). \
>>>
>>> withColumnRenamed("_7", "PADDING"). \
>>>
>>> withColumn("op_type", lit(1)). \
>>>
>>> withColumn("op_time", current_timestamp())
>>>
>>> Then  you can look at all records that were created and subsequently
>>> deleted and at what time
>>>
>>>
>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal  wrote:
>>>
>>>> Hi Sean,
>>>>
>>>> So you mean if I use those file formats it will do the work of CDC
>>>> automatically or I would have to handle it via code ?
>>>>
>>>> Hi Mich,
>>>>
>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>> ingested it. Now suppose if this user id is deleted from the source itself.
>>>> Then how can I delete it in my transformed db
>>>> ?
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen,  wrote:
>>>>
>

Re: How to delete the record

2022-01-27 Thread Sid Kal
Okay sounds good.

So,  below two options would help me to capture CDC changes:

1) Delta lake
2) Maintaining snapshot of records with some indicators and timestamp.

Correct me if I'm wrong

Thanks,
Sid

On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, 
wrote:

> There are two ways of doing it.
>
>
>1. Through snapshot offered meaning an immutable snapshot of the state
>of the table at a given version. For example, the state
><https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>the version
><https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>2. Creating your own versioning. Taking your example you define the
>target storage *with two added columns, namely:* op_type INT
>(1-inset,2-update,3-delete) and op_timeTIMESTAMP .
>Your example record will be
>
>
> id   op_type  op_time
>
> 11 
>
> 13 
>
>
>df = rdd.toDF(). \
>
> withColumnRenamed("_1", "ID"). \
>
> withColumnRenamed("_2", "CLUSTERED"). \
>
> withColumnRenamed("_3", "SCATTERED"). \
>
> withColumnRenamed("_4", "RANDOMISED"). \
>
> withColumnRenamed("_5", "RANDOM_STRING"). \
>
> withColumnRenamed("_6", "SMALL_VC"). \
>
> withColumnRenamed("_7", "PADDING"). \
>
> withColumn("op_type", lit(1)). \
>
> withColumn("op_time", current_timestamp())
>
> Then  you can look at all records that were created and subsequently
> deleted and at what time
>
>
> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 17:54, Sid Kal  wrote:
>
>> Hi Sean,
>>
>> So you mean if I use those file formats it will do the work of CDC
>> automatically or I would have to handle it via code ?
>>
>> Hi Mich,
>>
>> Not sure if I understood you. Let me try to explain my scenario. Suppose
>> there is a Id "1" which is inserted today, so I transformed and ingested
>> it. Now suppose if this user id is deleted from the source itself. Then how
>> can I delete it in my transformed db
>> ?
>>
>>
>>
>> On Thu, 27 Jan 2022, 22:44 Sean Owen,  wrote:
>>
>>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>>> etc of data, using Spark, on cloud storage.
>>>
>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Where ETL data is stored?
>>>>
>>>>
>>>>
>>>> *But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.*
>>>>
>>>>
>>>> If your final sync (storage) is data warehouse, it should be soft
>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal  wrote:
>>>>
>>>>> I am using Spark incremental approach for bringing the latest data
>>>>> everyday. Everything works fine.
>>>>>
>>>>> But now the main problem is when the record at the source is deleted,
>>>>> it should be deleted in my final transformed record too.
>>>>>
>>>>> How do I capture such changes and change my table too ?
>>>>>
>>>>> Best regards,
>>>>> Sid
>>>>>
>>>>>


Re: How to delete the record

2022-01-27 Thread Sid Kal
Hi Sean,

So you mean if I use those file formats it will do the work of CDC
automatically or I would have to handle it via code ?

Hi Mich,

Not sure if I understood you. Let me try to explain my scenario. Suppose
there is a Id "1" which is inserted today, so I transformed and ingested
it. Now suppose if this user id is deleted from the source itself. Then how
can I delete it in my transformed db
?



On Thu, 27 Jan 2022, 22:44 Sean Owen,  wrote:

> This is what storage engines like Delta, Hudi, Iceberg are for. No need to
> manage it manually or use a DBMS. These formats allow deletes, upserts, etc
> of data, using Spark, on cloud storage.
>
> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Where ETL data is stored?
>>
>>
>>
>> *But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.*
>>
>>
>> If your final sync (storage) is data warehouse, it should be soft flagged
>> with op_type (Insert/Update/Delete) and op_time (timestamp).
>>
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 27 Jan 2022 at 15:48, Sid Kal  wrote:
>>
>>> I am using Spark incremental approach for bringing the latest data
>>> everyday. Everything works fine.
>>>
>>> But now the main problem is when the record at the source is deleted, it
>>> should be deleted in my final transformed record too.
>>>
>>> How do I capture such changes and change my table too ?
>>>
>>> Best regards,
>>> Sid
>>>
>>>


Re: How to delete the record

2022-01-27 Thread Sid Kal
Hi Mich,

Thanks for your time.

Data is stored in S3 via DMS which is read in the Spark jobs.

How can I mark as a soft delete ?

Any small snippet / link / example. Anything would help.

Thanks,
Sid

On Thu, 27 Jan 2022, 22:26 Mich Talebzadeh, 
wrote:

> Where ETL data is stored?
>
>
>
> *But now the main problem is when the record at the source is deleted, it
> should be deleted in my final transformed record too.*
>
>
> If your final sync (storage) is data warehouse, it should be soft flagged
> with op_type (Insert/Update/Delete) and op_time (timestamp).
>
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 15:48, Sid Kal  wrote:
>
>> I am using Spark incremental approach for bringing the latest data
>> everyday. Everything works fine.
>>
>> But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.
>>
>> How do I capture such changes and change my table too ?
>>
>> Best regards,
>> Sid
>>
>>


How to delete the record

2022-01-27 Thread Sid Kal
I am using Spark incremental approach for bringing the latest data
everyday. Everything works fine.

But now the main problem is when the record at the source is deleted, it
should be deleted in my final transformed record too.

How do I capture such changes and change my table too ?

Best regards,
Sid


Re: Python code crashing on ReduceByKey if I return custom class object

2014-10-27 Thread sid
I have implemented the advise and now the issue is resolved .





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-code-crashing-on-ReduceByKey-if-I-return-custom-class-object-tp17317p17426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Python code crashing on ReduceByKey if I return custom class object

2014-10-26 Thread sid
Hi , I am new to spark and I am trying to use pyspark.

I am trying to find mean of 128 dimension vectors present in a file .

Below is the code


from cStringIO import StringIO

class testing:
def __str__(self):
file_str = StringIO()
file_str.write(str(self.first)) 
file_str.write(" ")
for n in self.vector:
file_str.write(str(n)) 
file_str.write(" ")
file_str.write(self.filename) 
return file_str.getvalue()
def __init__(self,txt=""):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i==0 :
self.key = float(n)
i = i+1
continue
if i<128:
self.vector[i-1]=float(n)
i = i+1
continue
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a

def InitializeAndReturnPair(string):
vec = testing(string)
return vec.key,vec


from pyspark import SparkConf, SparkContext
conf = (SparkConf()
 .setMaster("local")
 .setAppName("My app")
 .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s)).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : addVec(a,b)).collect()

-

I am not able to figure out where I am missing out , I tried changing the
serializer but still getting similar error.

Error

Spark assembly has been built with Hive, including Datanucleus jars on
classpath Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties 14/10/25 05:02:39 WARN Utils:
Your hostname, Sid resolves to a loopback address: 127.0.1.1; using
192.168.0.15 instead (on interface wlan0) 14/10/25 05:02:39 WARN Utils: Set
SPARK_LOCAL_IP if you need to bind to another address 14/10/25 05:02:39 INFO
SecurityManager: Changing view acls to: sid, 14/10/25 05:02:39 INFO
SecurityManager: Changing modify acls to: sid, 14/10/25 05:02:39 INFO
SecurityManager: SecurityManager: authentication disabled; ui acls disabled;
users with view permissions: Set(sid, ); users with modify permissions:
Set(sid, ) 14/10/25 05:02:40 INFO Slf4jLogger: Slf4jLogger started 14/10/25
05:02:40 INFO Remoting: Starting remoting 14/10/25 05:02:40 INFO Remoting:
Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO
Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@192.168.0.15:52124] 14/10/25 05:02:40 INFO Utils:
Successfully started service 'sparkDriver' on port 52124. 14/10/25 05:02:40
INFO SparkEnv: Registering MapOutputTracker 14/10/25 05:02:40 INFO SparkEnv:
Registering BlockManagerMaster 14/10/25 05:02:40 INFO DiskBlockManager:
Created local directory at /tmp/spark-local-20141025050240-56c5 14/10/25
05:02:40 INFO Utils: Successfully started service 'Connection manager for
block manager' on port 51705. 14/10/25 05:02:40 INFO ConnectionManager:
Bound socket to port 51705 with id = ConnectionManagerId(192.168.0.15,51705)
14/10/25 05:02:40 INFO MemoryStore: MemoryStore started with capacity 265.4
MB 14/10/25 05:02:40 INFO BlockManagerMaster: Trying to register
BlockManager 14/10/25 05:02:40 INFO BlockManagerMasterActor: Registering
block manager 192.168.0.15:51705 with 265.4 MB RAM 14/10/25 05:02:40 INFO
BlockManagerMaster: Registered BlockManager 14/10/25 05:02:40 INFO
HttpFileServer: HTTP File server directory is
/tmp/spark-ce172226-4732-4633-aa45-1cbd45b7ec98 14/10/25 05:02:40 INFO
HttpServer: Starting HTTP Server 14/10/25 05:02:40 INFO Utils: Successfully
started service 'HTTP file server' on port 55111. 14/10/25 05:02:40 INFO
Utils: Successfully started service 'SparkUI' on port 4040. 14/10/25
05:02:40 INFO SparkUI: Started SparkUI at http ://192.168.0.15:4040 14/10/25
05:02:41 WARN NativeCodeLoader: Unable to load native-hadoop library for
your platform... using builtin-java classes where applicable 14/10/25
05:02:41 INFO Utils: Copying /home/sid/Downloads/spark/pdsWork/AskHelp.py to
/tmp/spark-80005f48-e41d-48ff-b249-bdf87130de5d/AskHelp.py 14/10/25 05:02:41
INFO SparkContext: Added file
file:/home/sid/Downloads/spark/pdsWork/AskHelp.py at http
://192.168.0.15:55111/files/AskHelp.py with timestamp 1414227761547 14/10/25
05:02:41 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@192.168.0.15:52124/user/HeartbeatReceiver 14/10/25
05:02:41 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0,
maxMem=278302556 14/10/25 05:02:41 INFO MemoryStore: Block broadcast_0
stored as values in memory (estimated size 159.9