Convert timestamp to unix miliseconds

2021-08-04 Thread Tzahi File
Hi All,

I'm using spark 2.4 and trying to convert a timestamp column to unix with
milliseconds using the unix_timestamp function.

I tried to convert the result to double cast(unix_timestamp(timestamp) as
double).
I also tried using the timestamp format "-MM-dd HH:mm:ss.sss" and no
matter what I already tested I just kept getting the unix time in seconds.

Looking for any new ideas.


Thanks.


Re: Spark performance over S3

2021-04-07 Thread Tzahi File
Hi Hariharan,

Thanks for your reply.

In both cases we are writing the data to S3. The difference is that in the
first case we read the data from S3 and in the second we read from HDFS.
We are using ListObjectsV2 API in S3A
<https://issues.apache.org/jira/browse/HADOOP-13421>.

The S3 bucket and the cluster are located at the same AWS region.



On Wed, Apr 7, 2021 at 2:12 PM Hariharan  wrote:

> Hi Tzahi,
>
> Comparing the first two cases:
>
>- > reads the parquet files from S3 and also writes to S3, it takes 22
>min
>- > reads the parquet files from S3 and writes to its local hdfs, it
>takes the same amount of time (±22 min)
>
>
> It looks like most of the time is being spent in reading, and the time
> spent in writing is likely negligible (probably you're not writing much
> output?)
>
> Can you clarify what is the difference between these two?
>
> > reads the parquet files from S3 and writes to its local hdfs, it takes
> the same amount of time (±22 min)?
> > reads the parquet files from S3 (they were copied into the hdfs before)
> and writes to its local hdfs, the job took 7 min
>
> In the second case, was the data read from hdfs or s3?
>
> Regarding the point from the post you linked to:
> 1, Enhanced networking does make a difference
> <https://laptrinhx.com/hadoop-with-enhanced-networking-on-aws-1893465489/>,
> but it should be automatically enabled if you're using a compatible
> instance type and an AWS AMI. However if you're using a custom AMI, you
> might want to check if it's enabled for you.
> 2. VPC endpoints also can make a difference in performance - at least that
> used to be the case a few years ago. Maybe that has changed now.
>
> Couple of other things you might want to check:
> 1. If your bucket is versioned, you may want to check if you're using the 
> ListObjectsV2
> API in S3A <https://issues.apache.org/jira/browse/HADOOP-13421>.
> 2. Also check these recommendations from Cloudera
> <https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.5/bk_cloud-data-access/content/s3-performance.html>
> for optimal use of S3A.
>
> Thanks,
> Hariharan
>
>
>
> On Wed, Apr 7, 2021 at 12:15 AM Tzahi File  wrote:
>
>> Hi All,
>>
>> We have a spark cluster on aws ec2 that has 60 X i3.4xlarge.
>>
>> The spark job running on that cluster reads from an S3 bucket and writes
>> to that bucket.
>>
>> the bucket and the ec2 run in the same region.
>>
>> As part of our efforts to reduce the runtime of our spark jobs we found
>> there's serious latency when reading from S3.
>>
>> When the job:
>>
>>- reads the parquet files from S3 and also writes to S3, it takes 22
>>min
>>- reads the parquet files from S3 and writes to its local hdfs, it
>>takes the same amount of time (±22 min)
>>- reads the parquet files from S3 (they were copied into the hdfs
>>before) and writes to its local hdfs, the job took 7 min
>>
>> the spark job has the following S3-related configuration:
>>
>>- spark.hadoop.fs.s3a.connection.establish.timeout=5000
>>- spark.hadoop.fs.s3a.connection.maximum=200
>>
>> when reading from S3 we tried to increase the
>> spark.hadoop.fs.s3a.connection.maximum config param from 200 to 400 or 900
>> but it didn't reduce the S3 latency.
>>
>> Do you have any idea for the cause of the read latency from S3?
>>
>> I saw this post
>> <https://aws.amazon.com/premiumsupport/knowledge-center/s3-transfer-data-bucket-instance/>
>>  to
>> improve the transfer speed, is something here relevant?
>>
>>
>> Thanks,
>> Tzahi
>>
>

-- 
Tzahi File
Data Engineers Team Lead
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Spark performance over S3

2021-04-06 Thread Tzahi File
Hi All,

We have a spark cluster on aws ec2 that has 60 X i3.4xlarge.

The spark job running on that cluster reads from an S3 bucket and writes to
that bucket.

the bucket and the ec2 run in the same region.

As part of our efforts to reduce the runtime of our spark jobs we found
there's serious latency when reading from S3.

When the job:

   - reads the parquet files from S3 and also writes to S3, it takes 22 min
   - reads the parquet files from S3 and writes to its local hdfs, it takes
   the same amount of time (±22 min)
   - reads the parquet files from S3 (they were copied into the hdfs
   before) and writes to its local hdfs, the job took 7 min

the spark job has the following S3-related configuration:

   - spark.hadoop.fs.s3a.connection.establish.timeout=5000
   - spark.hadoop.fs.s3a.connection.maximum=200

when reading from S3 we tried to increase the
spark.hadoop.fs.s3a.connection.maximum config param from 200 to 400 or 900
but it didn't reduce the S3 latency.

Do you have any idea for the cause of the read latency from S3?

I saw this post

to
improve the transfer speed, is something here relevant?


Thanks,
Tzahi


Spark Parquet file size

2020-11-10 Thread Tzahi File
Hi,

We have many Spark jobs that create multiple small files. We would like to
improve analyst reading performance, doing so I'm testing the parquet
optimal file size.
I've found that the optimal file size should be around 1GB, and not less
than 128MB, depending on the size of the data.

I took one process to examine, in my process I'm using shuffle partitions =
600, which creates files of size 11MB. I've added a repartition part to
recreate less files - ~12 files of 600gb. After testing it (select * from
table where ...) I saw that the old version (with more files) ran faster
than the new one. I tried to increase the num of files to 40 - ~130MB each
file, and still it runs slower.

Would appreciate your experience with file sizes, and how to optimize the
num and size of files.

Thanks,
Tzahi


Re: Merging Parquet Files

2020-08-31 Thread Tzahi File
You are right.

In general this job should deal with very small files and create an output
file of less than 100MB.
In other cases I would need to create multiple files of around 100 MB..
The issues with partitions that decrease the number of partitions will
reduce ETLs performance, while this job should be a side job.




On Mon, Aug 31, 2020 at 5:52 PM Jörn Franke  wrote:

> Why only one file?
> I would go more for files of specific size, eg data is split in 1gb files.
> The reason is also that if you need to transfer it (eg to other clouds etc)
> - having a large file of several terabytes is bad.
>
> It depends on your use case but you might look also at partitions etc.
>
> Am 31.08.2020 um 16:17 schrieb Tzahi File :
>
> 
> Hi,
>
> I would like to develop a process that merges parquet files.
> My first intention was to develop it with PySpark using coalesce(1) -  to
> create only 1 file.
> This process is going to run on a huge amount of files.
> I wanted your advice on what is the best way to implement it (PySpark
> isn't a must).
>
>
> Thanks,
> Tzahi
>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Adding Partioned Field to The File

2020-08-31 Thread Tzahi File
Hi,

I'm using PySpark to write df to s3 in parquet.
I would like to add the partitioned columns to the file as well.
What is the best way to do this?

e.g df.write.partitionBy('day','hour')
file out come -> day,hour,time,name
and not time,name


Thanks!
Tzahi


Merging Parquet Files

2020-08-31 Thread Tzahi File
Hi,

I would like to develop a process that merges parquet files.
My first intention was to develop it with PySpark using coalesce(1) -  to
create only 1 file.
This process is going to run on a huge amount of files.
I wanted your advice on what is the best way to implement it (PySpark isn't
a must).


Thanks,
Tzahi


Re: Getting PySpark Partitions Locations

2020-06-25 Thread Tzahi File
I don't want to query with a distinct on the partitioned columns, the df
contains over 1 Billion of records.
I just want to know the partitions that were created..

On Thu, Jun 25, 2020 at 4:04 PM Jörn Franke  wrote:

> By doing a select on the df ?
>
> Am 25.06.2020 um 14:52 schrieb Tzahi File :
>
> 
> Hi,
>
> I'm using pyspark to write df to s3, using the following command:
> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
>
> Is there any way to get the partitions created?
> e.g.
> day=2020-06-20/hour=1/country=US
> day=2020-06-20/hour=2/country=US
> ..
>
> --
> Tzahi File
> Data Engineer
> [image: ironSource] <http://www.ironsrc.com/>
>
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com <http://www.ironsrc.com/>
> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
> twitter] <https://twitter.com/ironsource>[image: facebook]
> <https://www.facebook.com/ironSource>[image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Getting PySpark Partitions Locations

2020-06-25 Thread Tzahi File
Hi,

I'm using pyspark to write df to s3, using the following command:
"df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".

Is there any way to get the partitions created?
e.g.
day=2020-06-20/hour=1/country=US
day=2020-06-20/hour=2/country=US
..

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Issue with pyspark query

2020-06-10 Thread Tzahi File
Hi,

This is a general question regarding moving spark SQL query to PySpark, if
needed I will add some more from the errors log and query syntax.
I'm trying to move a spark SQL query to run through PySpark.
The query syntax and spark configuration are the same.
For some reason the query failed to run through PySpark with an java heap
space error.
In the Spark SQL query I'm using insert overwrite partition, while in
pyspark I'm using DF to write the data to a specific location in S3.

Are there any differences in the configuration that you might think I need
to change?


Thanks,

-- 
Tzahi
Data Engineer


Spark Adaptive configuration

2020-04-22 Thread Tzahi File
Hi,

I saw that spark has an option to adapt the join and shuffle configuration.
For example: "spark.sql.adaptive.shuffle.targetPostShuffleInputSize"

I wanted to know if you had an experience with such configuration, how it
changed the performance?

Another question is whether along Spark SQL query execution, there is an
option to dynamically change the shuffle partition config?

Thanks,
Tzahi


Splitting resource in Spark cluster

2019-12-29 Thread Tzahi File
Hi All,

I'm using one spark cluster cluster that contains 50 nodes from type i3.4xl
(16Vcores).
I'm trying to run 4 Spark SQL queries simultaneously.

The data is split to 10 even partitions and the 4 queries run on the same
data,but different partition. I have tried to configure the cluster so each
job will get the same resources and won't interfere with the other jobs
resources.
When running with 1/2 queries simultaneously I got much better performance
then the 4 queries.
Although I expected to get the same performance.

I'm looking for your advice on how to improve the performance by tuning the
configurations.

I have a total of 15*50 nodes
5 executors per instance
max-executers 37
shuffle partition 750
...

>From what I understand when setting 37 max executors when running 1,2,3,4
jobs in parallel they will have the same executors number, thus the same
running time..


Thanks,
Tzahi


Re: Issue With mod function in Spark SQL

2019-12-17 Thread Tzahi File
no.. there're 100M records both even and odd

On Tue, Dec 17, 2019 at 8:13 PM Russell Spitzer 
wrote:

> Is there a chance your data is all even or all odd?
>
> On Tue, Dec 17, 2019 at 11:01 AM Tzahi File 
> wrote:
>
>> I have in my spark sql query a calculated field that gets the value if
>> field1 % 3.
>>
>> I'm using this field as a partition so I expected to get 3 partitions in
>> the mentioned case, and I do get. The issue happened with even numbers
>> (instead of 3 - 4,2 ... ).
>> When I tried to use even numbers, for example 4 I got only 2 partitions -
>> 1 and 3.
>> Field 1 datatype is bigint.
>>
>> Do you have any suggestions?
>>
>>
>> --
>> thanks,
>> Tzahi
>>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Issue With mod function in Spark SQL

2019-12-17 Thread Tzahi File
I have in my spark sql query a calculated field that gets the value if
field1 % 3.

I'm using this field as a partition so I expected to get 3 partitions in
the mentioned case, and I do get. The issue happened with even numbers
(instead of 3 - 4,2 ... ).
When I tried to use even numbers, for example 4 I got only 2 partitions - 1
and 3.
Field 1 datatype is bigint.

Do you have any suggestions?


-- 
thanks,
Tzahi


Re: Using Percentile in Spark SQL

2019-11-11 Thread Tzahi File
Currently, I'm using the percentile approx function with Hive.
I'm looking for a better way to run this function or another way to get the
same result with spark, but faster and not using gigantic instances..

I'm trying to optimize this job by changing the Spark configuration. If you
have any ideas how to approach this, it would be great (like instance type,
number of instances, number of executers etc.)


On Mon, Nov 11, 2019 at 5:16 PM Patrick McCarthy 
wrote:

> Depending on your tolerance for error you could also use
> percentile_approx().
>
> On Mon, Nov 11, 2019 at 10:14 AM Jerry Vinokurov 
> wrote:
>
>> Do you mean that you are trying to compute the percent rank of some data?
>> You can use the SparkSQL percent_rank function for that, but I don't think
>> that's going to give you any improvement over calling the percentRank
>> function on the data frame. Are you currently using a user-defined function
>> for this task? Because I bet that's what's slowing you down.
>>
>> On Mon, Nov 11, 2019 at 9:46 AM Tzahi File 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
>>> percentile function. I'm trying to improve this job by moving it to run
>>> with spark SQL.
>>>
>>> Any suggestions on how to use a percentile function in Spark?
>>>
>>>
>>> Thanks,
>>> --
>>> Tzahi File
>>> Data Engineer
>>> [image: ironSource] <http://www.ironsrc.com/>
>>>
>>> email tzahi.f...@ironsrc.com
>>> mobile +972-546864835
>>> fax +972-77-5448273
>>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>>> ironsrc.com <http://www.ironsrc.com/>
>>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>>> twitter] <https://twitter.com/ironsource>[image: facebook]
>>> <https://www.facebook.com/ironSource>[image: googleplus]
>>> <https://plus.google.com/+ironsrc>
>>> This email (including any attachments) is for the sole use of the
>>> intended recipient and may contain confidential information which may be
>>> protected by legal privilege. If you are not the intended recipient, or the
>>> employee or agent responsible for delivering it to the intended recipient,
>>> you are hereby notified that any use, dissemination, distribution or
>>> copying of this communication and/or its content is strictly prohibited. If
>>> you are not the intended recipient, please immediately notify us by reply
>>> email or by telephone, delete this email and destroy any copies. Thank you.
>>>
>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Using Percentile in Spark SQL

2019-11-11 Thread Tzahi File
Hi,

Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
percentile function. I'm trying to improve this job by moving it to run
with spark SQL.

Any suggestions on how to use a percentile function in Spark?


Thanks,
-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Re: Caching tables in spark

2019-08-28 Thread Tzahi File
I mean two separate spark jobs



On Wed, Aug 28, 2019 at 2:25 PM Subash Prabakar 
wrote:

> When you mean by process is it two separate spark jobs? Or two stages
> within same spark code?
>
> Thanks
> Subash
>
> On Wed, 28 Aug 2019 at 19:06,  wrote:
>
>> Take a look at this article
>>
>>
>>
>>
>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-caching.html
>>
>>
>>
>> *From:* Tzahi File 
>> *Sent:* Wednesday, August 28, 2019 5:18 AM
>> *To:* user 
>> *Subject:* Caching tables in spark
>>
>>
>>
>> Hi,
>>
>>
>>
>> Looking for your knowledge with some question.
>>
>> I have 2 different processes that read from the same raw data table
>> (around 1.5 TB).
>>
>> Is there a way to read this data once and cache it somehow and to use
>> this data in both processes?
>>
>>
>>
>>
>>
>> Thanks
>>
>> --
>>
>> *Tzahi File*
>> Data Engineer
>>
>> [image: ironSource] <http://www.ironsrc.com/>
>>
>> *email* tzahi.f...@ironsrc.com
>>
>> *mobile* +972-546864835
>>
>> *fax* +972-77-5448273
>>
>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>> <https://www.google.com/maps/search/121+Derech+Menachem+Begin+st.+Tel+Aviv?entry=gmail=g>
>>
>> *ironsrc.com* <http://www.ironsrc.com/>
>>
>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>> twitter] <https://twitter.com/ironsource>[image: facebook]
>> <https://www.facebook.com/ironSource>[image: googleplus]
>> <https://plus.google.com/+ironsrc>
>>
>> This email (including any attachments) is for the sole use of the
>> intended recipient and may contain confidential information which may be
>> protected by legal privilege. If you are not the intended recipient, or the
>> employee or agent responsible for delivering it to the intended recipient,
>> you are hereby notified that any use, dissemination, distribution or
>> copying of this communication and/or its content is strictly prohibited. If
>> you are not the intended recipient, please immediately notify us by reply
>> email or by telephone, delete this email and destroy any copies. Thank you.
>>
>>
>>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Caching tables in spark

2019-08-28 Thread Tzahi File
Hi,

Looking for your knowledge with some question.
I have 2 different processes that read from the same raw data table (around
1.5 TB).
Is there a way to read this data once and cache it somehow and to use this
data in both processes?


Thanks
-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


[Spark SQL] failure in query

2019-08-25 Thread Tzahi File
Hi,

I encountered some issue to run a spark SQL query, and will happy to some
advice.
I'm trying to run a query on a very big data set (around 1.5TB) and it
getting failures in all of my tries. A template of the query is as below:
insert overwrite table partition(part)
select  /*+ BROADCAST(c) */
 *, row_number() over (partition by request_id order by economic_value
DESC) row_number
from (
select a,b,c,d,e
from table (raw data 1.5TB))
left join small_table

The heavy part in this query is the window function.
I'm using 65 spots of type 5.4x.large.
The spark settings:
--conf spark.driver.memory=10g
--conf spark.sql.shuffle.partitions=1200
--conf spark.executor.memory=22000M
--conf spark.shuffle.service.enabled=false


You can see below an example of the errors that I get:
[image: image.png]


any suggestions?



Thanks!
Tzahi


Re: Performance Issue

2019-01-13 Thread Tzahi File
Hi Gourav,

I tried to remove the left join to see how it influences on the
performance.
It was a difference of about 3 min only.
So I'm looking for a solution that may decrease the running time more
significantly (now the running time is about 2 hours)

On Sun, Jan 13, 2019 at 1:12 PM Gourav Sengupta 
wrote:

> Hi Tzahi,
>
> I think that SPARK automatically broadcasts with the latest versions, but
> you might have to check with your version. Did you try filtering first and
> then doing the LEFT JOIN?
>
> Regards,
> Gourav Sengupta
>
> On Sun, Jan 13, 2019 at 9:20 AM Tzahi File  wrote:
>
>> Hi Gourav,
>>
>> I just wanted to attach an example of my query so I replaced my fields
>> names with  "select *", I do have an agg fields in my query.
>>
>> What about improving performance with Sparks - like broadcasting or
>> something like that?
>>
>> Thanks,
>> Tzahi
>>
>> On Thu, Jan 10, 2019 at 7:23 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Tzahi,
>>>
>>> by using GROUP BY without any aggregate columns are you just trying to
>>> find out the DISTINCT of the columns ?
>>>
>>> Also it may be of help (I do not know whether the SQL optimiser
>>> automatically takes care of this) to have the LEFT JOIN on a smaller data
>>> set by having joined on the device_id before as a subquery or separate
>>> query. And when you are writing the output of the JOIN between csv_file and
>>> raw_e to ORDER BY the output based on campaign_ID.
>>>
>>> Thanks and Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Thu, Jan 10, 2019 at 1:13 PM Tzahi File 
>>> wrote:
>>>
>>>> Hi Gourav,
>>>>
>>>> My version of Spark is 2.1.
>>>>
>>>> The data is stored on S3 directory in parquet format.
>>>>
>>>> I sent you an example for a query I would like to run (the raw_e table
>>>> is stored as parquet files and event_day is the partitioned filed):
>>>>
>>>> SELECT *
>>>> FROM (select *
>>>>   from parquet_files.raw_e as re
>>>>   WHERE  re.event_day >= '2018-11-28' AND re.event_day <=
>>>> '2018-12-28')
>>>> JOIN csv_file as g
>>>> ON g.device_id = re.id and g.advertiser_id = re.advertiser_id
>>>> LEFT JOIN campaigns as c
>>>> ON c.campaign_id = re.campaign_id
>>>> GROUP by 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10, 11, 12, 13, 14, 15,
>>>> 16, 17, 18, 19,20,21
>>>>
>>>> Looking forward to any insights.
>>>>
>>>>
>>>> Thanks.
>>>>
>>>> On Wed, Jan 9, 2019 at 8:21 AM Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can you please let us know the SPARK version, and the query, and
>>>>> whether the data is in parquet format or not, and where is it stored?
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> On Wed, Jan 9, 2019 at 1:53 AM 大啊  wrote:
>>>>>
>>>>>> What is your performance issue?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> At 2019-01-08 22:09:24, "Tzahi File"  wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have some performance issue running SQL query on Spark.
>>>>>>
>>>>>> The query contains one parquet partitioned table (partition by date)
>>>>>> one each partition is about 200gb and simple table with about 100 
>>>>>> records.
>>>>>> The spark cluster is of type m5.2xlarge - 8 cores. I'm using Qubole
>>>>>> interface for running the SQL query.
>>>>>>
>>>>>> After searching after how to improve my query I have added to the
>>>>>> configuration the above settings:
>>>>>> spark.sql.shuffle.partitions=1000
>>>>>> spark.dynamicAllocation.maxExecutors=200
>>>>>>
>>>>>> There wasn't any significant improvement. I'm looking for any ideas
>>>>>> to improve my running time.
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>> Tzahi
>>>>>>
>>>>>>
>>>

Re: Performance Issue

2019-01-13 Thread Tzahi File
Hi Gourav,

I just wanted to attach an example of my query so I replaced my fields
names with  "select *", I do have an agg fields in my query.

What about improving performance with Sparks - like broadcasting or
something like that?

Thanks,
Tzahi

On Thu, Jan 10, 2019 at 7:23 PM Gourav Sengupta 
wrote:

> Hi Tzahi,
>
> by using GROUP BY without any aggregate columns are you just trying to
> find out the DISTINCT of the columns ?
>
> Also it may be of help (I do not know whether the SQL optimiser
> automatically takes care of this) to have the LEFT JOIN on a smaller data
> set by having joined on the device_id before as a subquery or separate
> query. And when you are writing the output of the JOIN between csv_file and
> raw_e to ORDER BY the output based on campaign_ID.
>
> Thanks and Regards,
> Gourav Sengupta
>
>
> On Thu, Jan 10, 2019 at 1:13 PM Tzahi File  wrote:
>
>> Hi Gourav,
>>
>> My version of Spark is 2.1.
>>
>> The data is stored on S3 directory in parquet format.
>>
>> I sent you an example for a query I would like to run (the raw_e table is
>> stored as parquet files and event_day is the partitioned filed):
>>
>> SELECT *
>> FROM (select *
>>   from parquet_files.raw_e as re
>>   WHERE  re.event_day >= '2018-11-28' AND re.event_day <=
>> '2018-12-28')
>> JOIN csv_file as g
>> ON g.device_id = re.id and g.advertiser_id = re.advertiser_id
>> LEFT JOIN campaigns as c
>> ON c.campaign_id = re.campaign_id
>> GROUP by 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10, 11, 12, 13, 14, 15, 16,
>> 17, 18, 19,20,21
>>
>> Looking forward to any insights.
>>
>>
>> Thanks.
>>
>> On Wed, Jan 9, 2019 at 8:21 AM Gourav Sengupta 
>> wrote:
>>
>>> Hi,
>>>
>>> Can you please let us know the SPARK version, and the query, and whether
>>> the data is in parquet format or not, and where is it stored?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Wed, Jan 9, 2019 at 1:53 AM 大啊  wrote:
>>>
>>>> What is your performance issue?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> At 2019-01-08 22:09:24, "Tzahi File"  wrote:
>>>>
>>>> Hello,
>>>>
>>>> I have some performance issue running SQL query on Spark.
>>>>
>>>> The query contains one parquet partitioned table (partition by date)
>>>> one each partition is about 200gb and simple table with about 100 records.
>>>> The spark cluster is of type m5.2xlarge - 8 cores. I'm using Qubole
>>>> interface for running the SQL query.
>>>>
>>>> After searching after how to improve my query I have added to the
>>>> configuration the above settings:
>>>> spark.sql.shuffle.partitions=1000
>>>> spark.dynamicAllocation.maxExecutors=200
>>>>
>>>> There wasn't any significant improvement. I'm looking for any ideas
>>>> to improve my running time.
>>>>
>>>>
>>>> Thanks!
>>>> Tzahi
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>> --
>> Tzahi File
>> Data Engineer
>> [image: ironSource] <http://www.ironsrc.com/>
>>
>> email tzahi.f...@ironsrc.com
>> mobile +972-546864835
>> fax +972-77-5448273
>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>> ironsrc.com <http://www.ironsrc.com/>
>> [image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
>> twitter] <https://twitter.com/ironsource>[image: facebook]
>> <https://www.facebook.com/ironSource>[image: googleplus]
>> <https://plus.google.com/+ironsrc>
>> This email (including any attachments) is for the sole use of the
>> intended recipient and may contain confidential information which may be
>> protected by legal privilege. If you are not the intended recipient, or the
>> employee or agent responsible for delivering it to the intended recipient,
>> you are hereby notified that any use, dissemination, distribution or
>> copying of this communication and/or its content is strictly prohibited. If
>> you are not the intended recipient, please immediately notify us by reply
>> email or by telephone, delete this email and destroy any copies. Thank you.
>>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Re: Performance Issue

2019-01-10 Thread Tzahi File
Hi Gourav,

My version of Spark is 2.1.

The data is stored on S3 directory in parquet format.

I sent you an example for a query I would like to run (the raw_e table is
stored as parquet files and event_day is the partitioned filed):

SELECT *
FROM (select *
  from parquet_files.raw_e as re
  WHERE  re.event_day >= '2018-11-28' AND re.event_day <= '2018-12-28')
JOIN csv_file as g
ON g.device_id = re.id and g.advertiser_id = re.advertiser_id
LEFT JOIN campaigns as c
ON c.campaign_id = re.campaign_id
GROUP by 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10, 11, 12, 13, 14, 15, 16,
17, 18, 19,20,21

Looking forward to any insights.


Thanks.

On Wed, Jan 9, 2019 at 8:21 AM Gourav Sengupta 
wrote:

> Hi,
>
> Can you please let us know the SPARK version, and the query, and whether
> the data is in parquet format or not, and where is it stored?
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jan 9, 2019 at 1:53 AM 大啊  wrote:
>
>> What is your performance issue?
>>
>>
>>
>>
>>
>> At 2019-01-08 22:09:24, "Tzahi File"  wrote:
>>
>> Hello,
>>
>> I have some performance issue running SQL query on Spark.
>>
>> The query contains one parquet partitioned table (partition by date) one
>> each partition is about 200gb and simple table with about 100 records. The
>> spark cluster is of type m5.2xlarge - 8 cores. I'm using Qubole interface
>> for running the SQL query.
>>
>> After searching after how to improve my query I have added to the
>> configuration the above settings:
>> spark.sql.shuffle.partitions=1000
>> spark.dynamicAllocation.maxExecutors=200
>>
>> There wasn't any significant improvement. I'm looking for any ideas
>> to improve my running time.
>>
>>
>> Thanks!
>> Tzahi
>>
>>
>>
>>
>>
>

-- 
Tzahi File
Data Engineer
[image: ironSource] <http://www.ironsrc.com/>

email tzahi.f...@ironsrc.com
mobile +972-546864835
fax +972-77-5448273
ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
ironsrc.com <http://www.ironsrc.com/>
[image: linkedin] <https://www.linkedin.com/company/ironsource>[image:
twitter] <https://twitter.com/ironsource>[image: facebook]
<https://www.facebook.com/ironSource>[image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.


Performance Issue

2019-01-08 Thread Tzahi File
Hello,

I have some performance issue running SQL query on Spark.

The query contains one parquet partitioned table (partition by date) one
each partition is about 200gb and simple table with about 100 records. The
spark cluster is of type m5.2xlarge - 8 cores. I'm using Qubole interface
for running the SQL query.

After searching after how to improve my query I have added to the
configuration the above settings:
spark.sql.shuffle.partitions=1000
spark.dynamicAllocation.maxExecutors=200

There wasn't any significant improvement. I'm looking for any ideas
to improve my running time.


Thanks!
Tzahi