Re: 回复:Re: Build SPARK from source with SBT failed

2023-03-07 Thread Tufan Rakshit
I use m1 apple silicon , use java11 from Zulu , and runs SBT based Build
Jobs in Kubernetes

Best
Tufan

On Tue, 7 Mar 2023 at 16:11, Sean Owen  wrote:

> No, it's that JAVA_HOME wasn't set to .../Home. It is simply not finding
> javac, in the error. Zulu supports M1.
>
> On Tue, Mar 7, 2023 at 9:05 AM Artemis User 
> wrote:
>
>> Looks like Maven build did find the javac, just can't run it.  So it's
>> not a path problem but a compatibility problem.  Are you doing this on a
>> Mac with M1/M2?  I don't think that Zulu JDK supports Apple silicon.   Your
>> best option would be to use homebrew to install the dev tools (including
>> OpenJDK) on Mac.  On Ubuntu, it seems still the compatibility problem.  Try
>> to use the apt to install your dev tools, don't do it manually.  If you
>> manually install JDK, it doesn't install hardware-optimized JVM libraries.
>>
>> On 3/7/23 8:21 AM, ckgppl_...@sina.cn wrote:
>>
>> No. I haven't installed Apple Developer Tools. I have installed Zulu
>> OpenJDK 11.0.17 manually.
>> So I need to install Apple Developer Tools?
>> - 原始邮件 -
>> 发件人:Sean Owen  
>> 收件人:ckgppl_...@sina.cn
>> 抄送人:user  
>> 主题:Re: Build SPARK from source with SBT failed
>> 日期:2023年03月07日 20点58分
>>
>> This says you don't have the java compiler installed. Did you install the
>> Apple Developer Tools package?
>>
>> On Tue, Mar 7, 2023 at 1:42 AM  wrote:
>>
>> Hello,
>>
>> I have tried to build SPARK source codes with SBT in my local dev
>> environment (MacOS 13.2.1). But it reported following error:
>> [error] java.io.IOException: Cannot run program
>> "/Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/bin/javac" (in
>> directory "/Users/username/spark-remotemaster"): error=2, No such file or
>> directory
>>
>> [error] at
>> java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
>>
>> [error] at
>> java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
>>
>> [error] at
>> scala.sys.process.ProcessBuilderImpl$Simple.run(ProcessBuilderImpl.scala:75)
>> [error] at
>> scala.sys.process.ProcessBuilderImpl$AbstractBuilder.run(ProcessBuilderImpl.scala:106)
>>
>> I need to export JAVA_HOME to let it run successfully. But if I use maven
>> then I don't need to export JAVA_HOME. I have also tried to build SPARK
>> with SBT in Ubuntu X86_64 environment. It reported similar error.
>>
>> The official SPARK
>> documentation  haven't mentioned export JAVA_HOME operation. So I think
>> this is a bug which needs documentation or scripts change. Please correct
>> me if I am wrong.
>>
>> Thanks
>>
>> Liang
>>
>>
>>


Re: Help with Shuffle Read performance

2022-09-29 Thread Tufan Rakshit
that's Total Nonsense , EMR is total  crap , use kubernetes i will help
you .
can you please provide whats the size of the shuffle file that is getting
generated in each task .
What's the total number of Partitions that you have ?
What machines are you using ? Are you using an SSD ?

Best
Tufan

On Thu, 29 Sept 2022 at 20:28, Gourav Sengupta 
wrote:

> Hi,
>
> why not use EMR or data proc, kubernetes does not provide any benefit at
> all for such scale of work. It is a classical case of over engineering and
> over complication just for the heck of it.
>
> Also I think that in case you are in AWS, Redshift Spectrum or Athena for
> 90% of use cases are way optimal.
>
> Regards,
> Gourav
>
> On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria 
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>


Re: [EXTERNAL] Partial data with ADLS Gen2

2022-07-24 Thread Tufan Rakshit
Just use Delta 

Best 
Tufan
Sent from my iPhone

> On 24 Jul 2022, at 12:20, Shay Elbaz  wrote:
> 
> 
> This is a known issue. Apache Iceberg, Hudi and Delta lake and among the 
> possible solutions.
> Alternatively, instead of writing the output directly to the "official" 
> location, write it to some staging directory instead. Once the job is done, 
> rename the staging dir to the official location.
> From: kineret M 
> Sent: Sunday, July 24, 2022 1:06 PM
> To: user@spark.apache.org 
> Subject: [EXTERNAL] Partial data with ADLS Gen2
>  
> ATTENTION: This email originated from outside of GM.
> 
> 
>  
> I have spark batch application writing to ADLS Gen2 (hierarchy). 
> When designing the application I was sure the spark would perform global 
> commit once the job is committed, but what it really does it commits on each 
> task, meaning once task completes writing it moves from temp to target 
> storage. So if the batch fails we have partial data, and on retry we are 
> getting data duplications. 
> Our scale is really huge so rolling back (deleting data) is not an option for 
> us, the search will takes a lot of time. 
> Is there any "build in" solution, something we can use out of the box?
> 
> Thanks. 


Re: Question regarding how to make spar Scala to evenly divide the spark job between executors

2022-07-17 Thread Tufan Rakshit
Hey
Could you provide some pseudo code ?
Also what kind of machine are you using per executor ? How many cores per
executor ?
What's the size of input data and what's the size of the output ?
What kind of errors are you getting ?

Best
Tufan

On Sun, 17 Jul 2022 at 00:31, Orkhan Dadashov 
wrote:

> Hi,
>
> I am working on the project and as a resource I have provided 40 executors
> and 14 gb memory per executor.
>
> I am trying to optimize my spark job in such a way that spark will evenly
> distribute the spark job between the executors.
>
> Could you please give me some advice?
>
> Kind regards,
>
>
>


Re: [Building] Building with JDK11

2022-07-15 Thread Tufan Rakshit
maybe try intellij or some other IDE with SBT . Maven has been always
magical for me

Best
Tufan

On Sat, 16 Jul 2022 at 00:11, Sean Owen  wrote:

> Java 8 binaries are probably on your PATH
>
> On Fri, Jul 15, 2022, 5:01 PM Szymon Kuryło 
> wrote:
>
>> Hello,
>>
>> I'm trying to build a Java 11 Spark distro using the
>> dev/make-distribution.sh script.
>> I have set JAVA_HOME to point to JDK11 location, I've also set the
>> java.version property in pom.xml to 11, effectively also setting
>> `maven.compile.source` and `maven.compile.target`.
>> When inspecting classes from the `dist` directory with `javap -v`, I find
>> that the class major version is 52, which is specific to JDK8. Am I missing
>> something? Is there a reliable way to set the JDK used in the build process?
>>
>> Thanks,
>> Szymon K.
>>
>


Re: about cpu cores

2022-07-11 Thread Tufan Rakshit
so as an average every 4 core , you get back 3.6 core in Yarn , but you can
use only 3 .
in Kubernetes you get back 3.6 and also can use 3.6

Best
Tufan

On Mon, 11 Jul 2022 at 11:02, Yong Walt  wrote:

> We were using Yarn. thanks.
>
> On Sun, Jul 10, 2022 at 9:02 PM Tufan Rakshit  wrote:
>
>> Mainly depends what your cluster manager Yarn or kubernates ?
>> Best
>> Tufan
>>
>> On Sun, 10 Jul 2022 at 14:38, Sean Owen  wrote:
>>
>>> Jobs consist of tasks, each of which consumes a core (can be set to >1
>>> too, but that's a different story). If there are more tasks ready to
>>> execute than available cores, some tasks simply wait.
>>>
>>> On Sun, Jul 10, 2022 at 3:31 AM Yong Walt  wrote:
>>>
>>>> given my spark cluster has 128 cores totally.
>>>> If the jobs (each job was assigned only one core) I submitted to the
>>>> cluster are over 128, what will happen?
>>>>
>>>> Thank you.
>>>>
>>>


Re: about cpu cores

2022-07-10 Thread Tufan Rakshit
Mainly depends what your cluster manager Yarn or kubernates ?
Best
Tufan

On Sun, 10 Jul 2022 at 14:38, Sean Owen  wrote:

> Jobs consist of tasks, each of which consumes a core (can be set to >1
> too, but that's a different story). If there are more tasks ready to
> execute than available cores, some tasks simply wait.
>
> On Sun, Jul 10, 2022 at 3:31 AM Yong Walt  wrote:
>
>> given my spark cluster has 128 cores totally.
>> If the jobs (each job was assigned only one core) I submitted to the
>> cluster are over 128, what will happen?
>>
>> Thank you.
>>
>


Re: Migration from Spark 2.4.0 to Spark 3.1.1 caused SortMergeJoin to change to BroadcastHashJoin

2022-07-06 Thread Tufan Rakshit
There are a few solutions :
1. Please make sure your driver has enough memory to broadcast the smaller
dataframe .
2. Please change the config "spark.sql.autoBroadcastJoinThreshold": "2g"
this an example
3. please use Hint in  the Join , you need to scroll a bit down
https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html

i hope this helps .
Best
Tufan

On Wed, 6 Jul 2022 at 17:11, igor cabral uchoa
 wrote:

> Hi all, I hope everyone is doing well.
>
> I'm currently working on a Spark migration project that aims to migrate
> all Spark SQL pipelines for Spark 3.x version and take advantage of all
> performance improvements on it. My company is using Spark 2.4.0 but we are
> targeting to use officially the 3.1.1 for all Spark SQL data pipelines but 
> *without
> AQE enabled yet*. The primary goal is to keep everything the same but use
> the newest version. Later on, we can easily enable AQE for all data
> pipelines.
>
> After migrating some pipelines, we discovered a slight query plan change
> in the version upgrade. We found out that instead of SortMergeJoin it is
> using the BroadcastHashJoin to do the join between the tables of my query.
> Not only this, but the BroadcastExchange operation is occurring on the big
> table side, which seems strange from my perspective.
>
> You can see some snapshots and a better explanation of the problem here:
> https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa
>
> I'm setting `spark.sql.adaptive.enabled` to false, 
> `spark.sql.autoBroadcastJoinThreshold`
> to 10Mb, and `spark.sql.shuffle.partitions` to 200, but apparently only
> by changing the Spark 2 to 3 for this query, it has made the query plan
> changes and the performance has been degraded. In this specific scenario,
> we are facing a "Could not execute broadcast in 300 secs" error.
>
> Do you guys have any clue on why this is happening? My questions are:
>
> - Why Spark 3 has changed the join approach in this situation given that
> AQE is disabled and the spark.sql.autoBroadcastJoinThreshold is much
> smaller than the data set size?
> - Is this the expected behavior or could this represents a potential bug
> in Spark 3.x?
>
> Please, let me know your thoughts. I appreciate all the help in advance.
>


Re: Spark Doubts

2022-06-25 Thread Tufan Rakshit
Please find the answers inline please .
1) Can I apply predicate pushdown filters if I have data stored in S3 or it
should be used only while reading from DBs?
it can be applied in s3 if you store parquet , csv, json or in avro format
.It does not depend on the DB , its supported in object store like s3 as
well .

2) While running the data in distributed form, is my code copied to each
and every executor. As per me, it should be the case since code.zip would
be smaller in size to be copied on each worker node.
if  you are trying to join two datasets out of which one is small , Spark
by default would try to broadcast the smaller data set to the other
executor , rather going for a Sort merge Join , There is property which is
enabled by default from spark 3.1 , the limit for smaller dataframe to be
broadcasted is 10 MB , it can also be changed  to higher value with config .

3) Also my understanding of shuffling of data is " It is moving one
partition to another partition or moving data(keys) of one partition to
another partition of those keys. It increases memory since before shuffling
it copies the data in the memory and then transfers to another partition".
Is it correct? If not, please correct me.

It depends on the context of Distributed computing as Your data does not
sit in one machine , neither in one Disk . Shuffle is involved when you try
to trigger actions like Group by or Sort as it involves bringing all the
keys into one executor Do the computation , or when Sort merge Join is
triggered then both the dataset Sorted and this sort is Global sort not
partition wise sort . yes its memory intensive operation as , if you see a
lot of shuffle to be involved best to use SSD (M5d based machine in AWS ) .
As for really big jobs where TB worth of data has to be joined its not
possible to do all the operation in memory in RAM


Hope that helps .

Best
Tufan



On Sat, 25 Jun 2022 at 08:43, Sid  wrote:

> Hi Team,
>
> I have various doubts as below:
>
> 1) Can I apply predicate pushdown filters if I have data stored in S3 or
> it should be used only while reading from DBs?
>
> 2) While running the data in distributed form, is my code copied to each
> and every executor. As per me, it should be the case since code.zip would
> be smaller in size to be copied on each worker node.
>
> 3) Also my understanding of shuffling of data is " It is moving one
> partition to another partition or moving data(keys) of one partition to
> another partition of those keys. It increases memory since before shuffling
> it copies the data in the memory and then transfers to another partition".
> Is it correct? If not, please correct me.
>
> Please help me to understand these things in layman's terms if my
> assumptions are not correct.
>
> Thanks,
> Sid
>