Re: Structured Streaming Process Each Records Individually

2024-01-10 Thread Khalid Mammadov
Use foreachBatch or foreach methods:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

On Wed, 10 Jan 2024, 17:42 PRASHANT L,  wrote:

> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name  , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> *"os": "andriod",*
>> *"type": "mobile",*
>> *"device": {*
>> *"warrenty": "3 years",*
>> *"replace": "yes"*
>> *},*
>> *"zones": [*
>> *{*
>> *"city": "Bangalore",*
>> *"state": "KA",*
>> *"pin": "577401"*
>> *},*
>> *{*
>> *"city": "Mumbai",*
>> *"state": "MH",*
>> *"pin": "576003"*
>> *}*
>> *],*
>> *"@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2.  poduct_zones and product_devices , child table
>


Re: Can not complete the read csv task

2023-10-14 Thread Khalid Mammadov
This command only defines a new DataFrame, in order to see some results you
need to do something like merged_spark_data.show() on a new line.

Regarding the error I think it's typical error that you get when you run
Spark on Windows OS. You can suppress it using Winutils tool (Google it or
ChatGPT it to see how).

On Thu, 12 Oct 2023, 11:58 Kelum Perera,  wrote:

> Dear friends,
>
> I'm trying to get a fresh start with Spark. I tried to read few CSV files
> in a folder, but the task got stuck and not completed as shown in the
> copied content from the terminal.
>
> Can someone help to understand what is going wrong?
>
> Versions;
> java version "11.0.16" 2022-07-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.16+11-LTS-199)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.16+11-LTS-199, mixed
> mode)
>
> Python 3.9.13
> Windows 10
>
> Copied from the terminal;
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.5.0
>   /_/
>
> Using Python version 3.9.13 (main, Aug 25 2022 23:51:50)
> Spark context Web UI available at http://LK510FIDSLW4.ey.net:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1697089858181).
> SparkSession available as 'spark'.
> >>> merged_spark_data =
> spark.read.csv(r"C:\Users\Kelum.Perera\Downloads\data-master\nyse_all\nyse_data\*",
> header=False )
> Exception in thread "globPath-ForkJoinPool-1-worker-115"
> java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
> at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native
> Method)
> at
> org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
> at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
> at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
> at org.apache.hadoop.fs.Globber.listStatus(Globber.java:128)
> at org.apache.hadoop.fs.Globber.doGlob(Globber.java:291)
> at org.apache.hadoop.fs.Globber.glob(Globber.java:202)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2124)
> at
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:238)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$3(DataSource.scala:737)
> at
> org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> at
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>
>
>
> Noting happens afterwards. Appreciate your kind input to solve this.
>
> Best Regards,
> Kelum Perera
>
>
>
>


Re: PySpark error java.lang.IllegalArgumentException

2023-07-07 Thread Khalid Mammadov
Perhaps that parquet file is corrupted or got that is in that folder?
To check, try to read that file with pandas or other tools to see if you
can read without Spark.

On Wed, 5 Jul 2023, 07:25 elango vaidyanathan,  wrote:

>
> Hi team,
>
> Any updates on this below issue
>
> On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
> wrote:
>
>>
>>
>> Hi all,
>>
>> I am reading a parquet file like this and it gives 
>> java.lang.IllegalArgumentException.
>> However i can work with other parquet files (such as nyc taxi parquet
>> files) without any issue. I have copied the full error log as well. Can you
>> please check once and let me know how to fix this?
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
>> "20g").config("spark.driver.memory", "50g").getOrCreate()
>>
>> df=spark.read.parquet("/data/202301/account_cycle")
>>
>> df.printSchema() # worksfine
>>
>> df.count() #worksfine
>>
>> df.show()# getting below error
>>
>> >>> df.show()
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>>
>> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
>> struct> account_status: string, currency_code: string, opened_dt: date ... 30 more
>> fields>
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values
>> in memory (estimated size 540.6 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
>> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
>> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
>> at NativeMethodAccessorImpl.java:0
>>
>> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin
>> packing, max size: 134217728 bytes, open cost is considered as scanning
>> 4194304 bytes.
>>
>> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
>> NativeMethodAccessorImpl.java:0
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
>> NativeMethodAccessorImpl.java:0) with 1 output partitions
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
>> (showString at NativeMethodAccessorImpl.java:0)
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
>> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
>> which has no missing parents
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values
>> in memory (estimated size 38.1 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
>> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>>
>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
>> at DAGScheduler.scala:1478
>>
>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
>> ResultStage 14 (MapPartitionsRDD[42] at showString at
>> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
>> Vector(0))
>>
>> 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1
>> tasks resource profile 0
>>
>> 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0
>> (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
>> taskResourceAssignments Map()
>>
>> 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
>>
>> 23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
>> file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
>> 0-134217728, partition values: [empty row]
>>
>> 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0
>> (TID 48)
>>
>> java.lang.IllegalArgumentException
>>
>> at java.nio.Buffer.limit(Buffer.java:275)
>>
>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
>>
>> at
>> org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
>>
>> at
>> org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
>>
>> at java.io.DataInputStream.readFully(DataInputStream.java:195)
>>
>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>
>> at
>> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>>
>> at
>> org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>>
>> at
>> org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)
>>
>> at
>> 

Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-04-01 Thread Khalid Mammadov
Hey AN-TRUONG

I have got some articles about this subject that should help.
E.g.
https://khalidmammadov.github.io/spark/spark_internals_rdd.html

Also check other Spark Internals on web.

Regards
Khalid

On Fri, 31 Mar 2023, 16:29 AN-TRUONG Tran Phan, 
wrote:

> Thank you for your information,
>
> I have tracked the spark history server on port 18080 and the spark UI on
> port 4040. I see the result of these two tools as similar right?
>
> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
> the images does, is it possible?
> https://i.stack.imgur.com/Azva4.png
>
> Best regards,
>
> An - Truong
>
>
> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
> wrote:
>
>> Are you familiar with spark GUI default on port 4040?
>>
>> have a look.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>> tr.phan.tru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am learning about Apache Spark and want to know the meaning of each
>>> Task created on the Jobs recorded on Spark history.
>>>
>>> For example, the application I write creates 17 jobs, in which job 0
>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>> the meaning of these 2384, is it possible?
>>>
>>> I found a picture of DAG in the Jobs and want to know the relationship
>>> between DAG and Task, is it possible (Specifically from the attached file
>>> DAG and 2384 tasks below)?
>>>
>>> Thank you very much, have a nice day everyone.
>>>
>>> Best regards,
>>>
>>> An-Trường.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Trân Trọng,
>
> An Trường.
>


Re: Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-14 Thread Khalid Mammadov
I am not k8s expert but I think you got permission issue. Try 777 as an
example to see if it works.

On Mon, 13 Feb 2023, 21:42 karan alang,  wrote:

> Hello All,
>
> I'm trying to run a simple application on GKE (Kubernetes), and it is
> failing:
> Note : I have spark(bitnami spark chart) installed on GKE using helm
> install
>
> Here is what is done :
> 1. created a docker image using Dockerfile
>
> Dockerfile :
> ```
>
> FROM python:3.7-slim
>
> RUN apt-get update && \
> apt-get install -y default-jre && \
> apt-get install -y openjdk-11-jre-headless && \
> apt-get clean
>
> ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
>
> RUN pip install pyspark
> RUN mkdir -p /myexample && chmod 755 /myexample
> WORKDIR /myexample
>
> COPY src/StructuredStream-on-gke.py /myexample/StructuredStream-on-gke.py
>
> CMD ["pyspark"]
>
> ```
> Simple pyspark application :
> ```
>
> from pyspark.sql import SparkSession
> spark = 
> SparkSession.builder.appName("StructuredStreaming-on-gke").getOrCreate()
>
> data = [('k1', 123000), ('k2', 234000), ('k3', 456000)]
> df = spark.createDataFrame(data, ('id', 'salary'))
>
> df.show(5, False)
>
> ```
>
> Spark-submit command :
> ```
>
> spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode
> cluster --name pyspark-example --conf
> spark.kubernetes.container.image=pyspark-example:0.1 --conf
> spark.kubernetes.file.upload.path=/myexample src/StructuredStream-on-gke.py
> ```
>
> Error i get :
> ```
>
> 23/02/13 13:18:27 INFO KubernetesUtils: Uploading file:
> /Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
> to dest:
> /myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a/StructuredStream-on-gke.py...
>
> Exception in thread "main" org.apache.spark.SparkException: Uploading file
> /Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
> failed...
>
> at
> org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:296)
>
> at
> org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270)
>
> at
> org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109)
>
> at
> org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59)
>
> at
> scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
>
> at
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
>
> at scala.collection.immutable.List.foldLeft(List.scala:89)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)
>
> at
> org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)
>
> at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)
>
> at
> org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)
>
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
>
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Caused by: org.apache.spark.SparkException: Error uploading file
> StructuredStream-on-gke.py
>
> at
> org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:319)
>
> at
> org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:292)
>
> ... 21 more
>
> Caused by: java.io.IOException: Mkdirs failed to create
> /myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:317)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:305)
>
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
>
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
>
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:414)
>
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:387)
>
> at 

Re: How to set a config for a single query?

2023-01-05 Thread Khalid Mammadov
Hi

I believe there is a feature in Spark specifically for this purpose. You
can create a new spark session and set those configs.
Note that it's not the same as creating a separate driver processes with
separate sessions, here you will still have the same SparkContext that
works as a backend for both or more spark sessions and does all the heavy
work.

*spark.newSession()*

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.newSession.html#pyspark.sql.SparkSession.newSession

Hope this helps
Khalid


On Wed, 4 Jan 2023, 00:25 Felipe Pessoto,  wrote:

> Hi,
>
>
>
> In Scala is it possible to set a config value to a single query?
>
>
>
> I could set/unset the value, but it won’t work for multithreading
> scenarios.
>
>
>
> Example:
>
>
>
> spark.sql.adaptive.coalescePartitions.enabled = false
>
> queryA_df.collect
>
> spark.sql.adaptive.coalescePartitions.enabled=original value
>
> queryB_df.collect
>
> queryC_df.collect
>
> queryD_df.collect
>
>
>
>
>
> If I execute that block of code multiple times using multiple thread, I
> can end up executing Query A with coalescePartitions.enabled=true, and
> Queries B, C and D with the config set to false, because another thread
> could set it between the executions.
>
>
>
> Is there any good alternative to this?
>
>
>
> Thanks.
>


Re: [Spark Core] [Advanced] [How-to] How to map any external field to job ids spawned by Spark.

2022-12-28 Thread Khalid Mammadov
There is a feature in SparkContext to set localProperties
(setLocalProperty) where you can set your Request ID and then using
SparkListener instance read that ID with Job ID using onJobStart event.

Hope this helps.

On Tue, 27 Dec 2022, 13:04 Dhruv Toshniwal,
 wrote:

> TL;Dr -
> how-to-map-external-request-ids-to-spark-job-ids-for-spark-instrumentation
> 
>
> Hi team,
>
> We are the engineering team of Mindtickle Inc. and we have a use-case
> where we want to store a map of request Ids (unique API call ID) to Spark
> Job Ids. Architecturally, we have created a system where our users use
> various Analytics tools on the frontend which in turn run Spark Jobs
> internally and then serve computed data back to them. We receive various
> API calls from upstream and serve it via Apache Spark computing on the
> backend.
> However, as our customer base has grown, we have come to receive lots of
> parallel requests. We have observed that Spark Jobs take different time for
> the same API requests from upstream. Therefore, for Spark instrumentation
> purposes we wish to maintain a map of requestID generated at our end to the
> job IDs that Spark internally generates in relation to these requesrIDs.
> This will enable us to go back in time via the history server or custom
> SparkListeners to debug and improve our system. Any leads in this direction
> would be greatly appreciated. I would love to explain our use case in
> greater detail if required.
>
> Thanks and Regards,
> Dhruv Toshniwal
> SDE-2
> Mindtickle Inc.
>


Re: Moving to Spark 3x from Spark2

2022-09-01 Thread Khalid Mammadov
Hi Rajat

There were a lot of changes between those versions and the only possible
option to assess impact to do your testings unfortunately.

Most probably you will have to do some changes to your codebase.

Regards
Khalid


On Thu, 1 Sept 2022, 11:45 rajat kumar,  wrote:

> Hello Members,
>
> We want to move to Spark 3 from Spark2.4 .
>
> Are there any changes we need to do at code level which can break the
> existing code?
>
> Will it work by simply changing the version of spark & scala ?
>
> Regards
> Rajat
>


Re: Pyspark and multiprocessing

2022-07-21 Thread Khalid Mammadov
Pool.map requires 2 arguments. 1st a function and 2nd an iterable i.e.
list, set etc.
Check out examples from official docs how to use it:
https://docs.python.org/3/library/multiprocessing.html


On Thu, 21 Jul 2022, 21:25 Bjørn Jørgensen, 
wrote:

> Thank you.
> The reason for using spark local is to test the code, and as in this case
> I find the bottlenecks and fix them before I spinn up a K8S cluster.
>
> I did test it now with
> 16 cores and 10 files
>
> import time
>
> tic = time.perf_counter()
> json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
> '/home/jovyan/notebooks/falk/test/test.json')
> toc = time.perf_counter()
> print(f"Func run in {toc - tic:0.4f} seconds")
>
> Func run in 30.3695 seconds
>
>
> then I stop spark and stat it with setMaster('local[1]')
>
> and now
>
> Func run in 30.8168 seconds
>
>
> Which means that it don`t matter if I run this code on one core or on a
> K8S cluster with 100 cores.
>
> So I tested the same with
>
> from multiprocessing.pool import ThreadPool
> import multiprocessing as mp
>
>
> if __name__ == "__main__":
> tic = time.perf_counter()
> pool = ThreadPool(mp.cpu_count())
> opt =
> pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
> '/home/jovyan/notebooks/falk/test/test.json'))
> toc = time.perf_counter()
> print(f"Func run in {toc - tic:0.4f} seconds")
>
> I get the same files and they are ok.
> But I also get this error
>
> TypeError Traceback (most recent call last)
> Input In [33], in ()  6 tic = time.perf_counter()  7 
> pool = ThreadPool(mp.cpu_count())> 8 opt = 
> pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", 
> '/home/jovyan/notebooks/falk/test/test.json'))  9 toc = 
> time.perf_counter() 10 print(f"Func run in {toc - tic:0.4f} seconds")
>
>
> TypeError: Pool.map() missing 1 required positional argument: 'iterable'
>
> So any hints on what to change? :)
>
> Spark has the pandas on spark API, and that is realy great. I prefer
> pandas on spark API and pyspark over pandas.
>
> tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov <
> khalidmammad...@gmail.com>:
>
>> One quick observation is that you allocate all your local CPUs to Spark
>> then execute that app with 10 Threads i.e 10 spark apps and so you will
>> need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
>> CPU bottleneck?
>>
>> Also on the side note, why you need Spark if you use that on local only?
>> Sparks power can only be (mainly) observed in a cluster env.
>> I have achieved great parallelism using pandas and pools on a local
>> machine in the past.
>>
>>
>> On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, 
>> wrote:
>>
>>> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
>>> They don`t have the same schema, so I have to loop over them one at a
>>> time.
>>>
>>> This works, but is`s very slow. This process takes 5 days!
>>>
>>> So now I have tried to run this functions in a ThreadPool. But it don`t
>>> seems to work.
>>>
>>>
>>> *Start local spark. The system have 16 cores and 64 GB.*
>>>
>>> number_cores = int(multiprocessing.cpu_count())
>>>
>>> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
>>> e.g. 4015976448
>>> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>>>
>>>
>>> def get_spark_session(app_name: str, conf: SparkConf):
>>> conf.setMaster('local[{}]'.format(number_cores))
>>> conf \
>>>   .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>>>   .set("spark.sql.repl.eagerEval.enabled", "True") \
>>>   .set("spark.sql.adaptive.enabled", "True") \
>>>   .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer") \
>>>   .set("spark.sql.repl.eagerEval.maxNumRows", "1")
>>>
>>> return
>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>
>>> spark = get_spark_session("Falk", SparkConf())
>>>
>>>
>>> *Function to rename columns with \\ *
>>>
>>> # We take a dataframe and return a new one with required changes
>>> def cleanDataFrame(df: DataFrame) -> DataFrame:
>>> # Returns a new sanitized field name (this function can be any

Re: Pyspark and multiprocessing

2022-07-21 Thread Khalid Mammadov
One quick observation is that you allocate all your local CPUs to Spark
then execute that app with 10 Threads i.e 10 spark apps and so you will
need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
CPU bottleneck?

Also on the side note, why you need Spark if you use that on local only?
Sparks power can only be (mainly) observed in a cluster env.
I have achieved great parallelism using pandas and pools on a local machine
in the past.


On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, 
wrote:

> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
> They don`t have the same schema, so I have to loop over them one at a
> time.
>
> This works, but is`s very slow. This process takes 5 days!
>
> So now I have tried to run this functions in a ThreadPool. But it don`t
> seems to work.
>
>
> *Start local spark. The system have 16 cores and 64 GB.*
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
> e.g. 4015976448
> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster('local[{}]'.format(number_cores))
> conf \
>   .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>   .set("spark.sql.repl.eagerEval.enabled", "True") \
>   .set("spark.sql.adaptive.enabled", "True") \
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") \
>   .set("spark.sql.repl.eagerEval.maxNumRows", "1")
>
> return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
> spark = get_spark_session("Falk", SparkConf())
>
>
> *Function to rename columns with \\ *
>
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
> # Returns a new sanitized field name (this function can be anything
> really)
> def sanitizeFieldName(s: str) -> str:
> return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
> .replace("[", "_").replace("]", "_").replace(".", "_")
>
> # We call this on all fields to create a copy and to perform any
> changes we might
> # want to do to the field.
> def sanitizeField(field: StructField) -> StructField:
> field = copy(field)
> field.name = sanitizeFieldName(field.name)
> # We recursively call cleanSchema on all types
> field.dataType = cleanSchema(field.dataType)
> return field
>
> def cleanSchema(dataType: [DataType]) -> [DateType]:
> dataType = copy(dataType)
> # If the type is a StructType we need to recurse otherwise we can
> return since
> # we've reached the leaf node
> if isinstance(dataType, StructType):
> # We call our sanitizer for all top level fields
> dataType.fields = [sanitizeField(f) for f in dataType.fields]
> elif isinstance(dataType, ArrayType):
> dataType.elementType = cleanSchema(dataType.elementType)
> return dataType
>
> # Now since we have the new schema we can create a new DataFrame by
> using the old Frame's RDD as data and the new schema as the schema for the
> data
> return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>
>
>
> *Function to flatten out a nested dataframe.*
>
>
> from pyspark.sql.types import *
> from pyspark.sql.functions import *
>
>
> def flatten_test(df, sep="_"):
> """Returns a flattened dataframe.
> .. versionadded:: x.X.X
>
> Parameters
> --
> sep : str
> Delimiter for flatted columns. Default `_`
>
> Notes
> -
> Don`t use `.` as `sep`
> It won't work on nested data frames with more than one level.
> And you will have to use `columns.name`.
>
> Flattening Map Types will have to find every key in the column.
> This can be slow.
>
> Examples
> 
>
> data_mixed = [
> {
> "state": "Florida",
> "shortname": "FL",
> "info": {"governor": "Rick Scott"},
> "counties": [
> {"name": "Dade", "population": 12345},
> {"name": "Broward", "population": 4},
> {"name": "Palm Beach", "population": 6},
> ],
> },
> {
> "state": "Ohio",
> "shortname": "OH",
> "info": {"governor": "John Kasich"},
> "counties": [
> {"name": "Summit", "population": 1234},
> {"name": "Cuyahoga", "population": 1337},
> ],
> },
> ]
>
> data_mixed = spark.createDataFrame(data=data_mixed)
>
> data_mixed.printSchema()
>
> root
> |-- counties: array (nullable = true)
> ||-- element: map (containsNull = true)
>  

Re: flatMap for dataframe

2022-02-09 Thread Khalid Mammadov
One way is to split->explode->pivot
These are column and Dataframe methods.
Here are quick examples from web:
https://www.google.com/amp/s/sparkbyexamples.com/spark/spark-split-dataframe-column-into-multiple-columns/amp/


https://www.google.com/amp/s/sparkbyexamples.com/spark/explode-spark-array-and-map-dataframe-column/amp/

On Wed, 9 Feb 2022, 01:55 frakass,  wrote:

> Hello
>
> for the RDD I can apply flatMap method:
>
>  >>> sc.parallelize(["a few words","ba na ba na"]).flatMap(lambda x:
> x.split(" ")).collect()
> ['a', 'few', 'words', 'ba', 'na', 'ba', 'na']
>
>
> But for a dataframe table how can I flatMap that as above?
>
>  >>> df.show()
> ++
> |   value|
> ++
> | a few lines|
> |hello world here|
> | ba na ba na|
> ++
>
>
> Thanks
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: why the pyspark RDD API is so slow?

2022-01-31 Thread Khalid Mammadov
Your scala program does not use any Spark API hence faster that others. If
you write the same code in pure Python I think it will be even faster than
Scala program, especially taking into account these 2 programs runs on a
single VM.

Regarding Dataframe and RDD I would suggest to use Dataframes anyway since
it's recommended approach since Spark 2.0.
RDD for Pyspark is slow as others said it needs to be
serialised/deserialised.

One general note is that Spark is written Scala and core is running on JVM
and Python is wrapper around Scala API and most of PySpark APIs are
delegated to Scala/JVM to be executed. Hence most of big data
transformation tasks will complete almost at the same time as they (Scala
and Python) use the same API under the hood. Therefore you can also observe
that APIs are very similar and code is written in the same fashion.


On Sun, 30 Jan 2022, 10:10 Bitfox,  wrote:

> Hello list,
>
> I did a comparison for pyspark RDD, scala RDD, pyspark dataframe and a
> pure scala program. The result shows the pyspark RDD is too slow.
>
> For the operations and dataset please see:
>
> https://blog.cloudcache.net/computing-performance-comparison-for-words-statistics/
>
> The result table is below.
> Can you give suggestions on how to optimize the RDD operation?
>
> Thanks a lot.
>
>
> *program* *time*
> scala program 49s
> pyspark dataframe 56s
> scala RDD 1m31s
> pyspark RDD 7m15s
>


Re: What are the most common operators for shuffle in Spark

2022-01-23 Thread Khalid Mammadov
I don't know actual implementation:  But, to me it's still necessary as
each worker reads data separately and reduces to get local distinct these
will then need to be shuffled to find actual distinct.

On Sun, 23 Jan 2022, 17:39 ashok34...@yahoo.com.INVALID,
 wrote:

> Hello,
>
> I know some operators in Spark are expensive because of shuffle.
>
> This document describes shuffle
>
> https://www.educba.com/spark-shuffle/
>
> and says
> More shufflings in numbers are not always bad. Memory constraints and
> other impossibilities can be overcome by shuffling.
>
> In RDD, the below are a few operations and examples of shuffle:
> – subtractByKey
> – groupBy
> – foldByKey
> – reduceByKey
> – aggregateByKey
> – transformations of a join of any type
> – distinct
> – cogroup
> I know some operations like reduceBykey are well known for creating
> shuffle but what I don't understand why distinct operation should cause
> shuffle!
>
>
> Thanking
>
>
>
>
>
>


Re: How to make batch filter

2022-01-02 Thread Khalid Mammadov
I think, you will get 1 partition as you have only one Executor/Worker
(I.e. your local machine, a node). But your tasks (smallest unit of work
item in Spark framework) will be processed in parallel on your 4 core. As
Spark runs one task per core.

You can also force to repartition it if you want while calling repartition
function anytime on your DataFrame but processing will be sequential (I
think) since one Executor

On Sun, 2 Jan 2022 at 00:20, Bitfox  wrote:

> One more question, for this big filter, given my server has 4 Cores, will
> spark (standalone mode) split the RDD to 4 partitions automatically?
>
> Thanks
>
> On Sun, Jan 2, 2022 at 6:30 AM Mich Talebzadeh 
> wrote:
>
>> Create a list of values that you don't want anf filter oon those
>>
>> >>> DF = spark.range(10)
>> >>> DF
>> DataFrame[id: bigint]
>> >>>
>> >>> array = [1, 2, 3, 8]  # don't want these
>> >>> DF.filter(DF.id.isin(array) == False).show()
>> +---+
>> | id|
>> +---+
>> |  0|
>> |  4|
>> |  5|
>> |  6|
>> |  7|
>> |  9|
>> +---+
>>
>>  or use binary NOT operator:
>>
>>
>> >>> DF.filter(*~*DF.id.isin(array)).show()
>>
>> +---+
>>
>> | id|
>>
>> +---+
>>
>> |  0|
>>
>> |  4|
>>
>> |  5|
>>
>> |  6|
>>
>> |  7|
>>
>> |  9|
>>
>> +---+
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 1 Jan 2022 at 20:59, Bitfox  wrote:
>>
>>> Using the dataframe API I need to implement a batch filter:
>>>
>>> DF. select(..).where(col(..) != ‘a’ and col(..) != ‘b’ and …)
>>>
>>> There are a lot of keywords should be filtered for the same column in
>>> where statement.
>>>
>>> How can I make it more smater? UDF or others?
>>>
>>> Thanks & Happy new Year!
>>> Bitfox
>>>
>>


Re: docker image distribution in Kubernetes cluster

2021-12-08 Thread Khalid Mammadov
Hi Mitch

IMO, it's done to provide most flexibility. So, some users can have
limited/restricted version of the image or with some additional software
that they use on the executors that is used during processing.

So, in your case you only need to provide the first one since the other two
configs will be copied from it

Regards
Khalid

On Wed, 8 Dec 2021, 10:41 Mich Talebzadeh, 
wrote:

> Just a correction that in Spark 3.2 documentation it states
> 
> that
>
> Property NameDefaultMeaning
> spark.kubernetes.container.image (none) Container image to use for the
> Spark application. This is usually of the form
> example.com/repo/spark:v1.0.0. This configuration is required and must be
> provided by the user, unless explicit images are provided for each
> different container type. 2.3.0
> spark.kubernetes.driver.container.image (value of
> spark.kubernetes.container.image) Custom container image to use for the
> driver. 2.3.0
> spark.kubernetes.executor.container.image (value of
> spark.kubernetes.container.image) Custom container image to use for
> executors.
>
> So both driver and executor images are mapped to the container image. In
> my opinion, they are redundant and will potentially add confusion so they
> should be removed?
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 8 Dec 2021 at 10:15, Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> We have three conf parameters to distribute the docker image with
>> spark-sumit in Kubernetes cluster.
>>
>> These are
>>
>> spark-submit --verbose \
>>   --conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.container.image=${IMAGEGCP} \
>>
>> when the above is run, it shows
>>
>> (spark.kubernetes.driver.docker.image,
>> eu.gcr.io/axial-glow-224522/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-addedpackages
>> )
>> (spark.kubernetes.executor.docker.image,
>> eu.gcr.io/axial-glow-224522/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-addedpackages
>> )
>> (spark.kubernetes.container.image,
>> eu.gcr.io/axial-glow-224522/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-addedpackages
>> )
>>
>> You notice that I am using the same docker image for driver, executor and
>> container. In Spark 3.2 (actually in recent spark versions), I cannot see
>> reference to driver or executor. Are these depreciated? It appears that
>> Spark still accepts them?
>>
>> Thanks
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>  h
>>
>>
>>
>>
>>
>>


Re: Apache Spark 3.2.0 | Pyspark | Pycharm Setup

2021-11-17 Thread Khalid Mammadov
Hi Anil

You dont need to download and Install Spark.
It's enough to add pyspark to PyCharm as a package for your environment and
start developing and testing locally. The thing is PySpark includes local
Spark that is installed as part of pip install.

When it comes to your particular issue. I believe you are using Windows and
there is well known issue with these "hadoop libraries". You will need to
install few executables to solve this. Check the web for that particular
error.

Regards
Khalid


On Wed, 17 Nov 2021, 08:35 Gourav Sengupta, 
wrote:

> Hi Anil,
>
> I generally create an anaconda environment, and then install pyspark in
> it, and then configure the interpreter to point to that particular
> environment. Never faced an issue with my approach.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Nov 17, 2021 at 7:39 AM Anil Kulkarni  wrote:
>
>> Hi Spark community,
>>
>> I am having a hard time setting up my Pycharm to work with pyspark. Can
>> any of you point me to documentation available?
>>
>> Things I have tried till now :
>>
>>1. Download and Install Apache spark
>>2. Add pyspark package in pycharm.
>>3. Add SPARK_HOME. PYTHONPATH, HADOOP_HOME env variables to Run config
>>
>> Error I am getting :
>>
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> Setting default log level to "WARN".
>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>> setLogLevel(newLevel).
>> 21/11/16 23:26:28 WARN NativeCodeLoader: *Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable*
>> Traceback (most recent call last):
>>
>>
>> --
>> Cheers,
>> Anil Kulkarni
>> https://anilkulkarni.com/
>>
>>


Re: Performance of PySpark jobs on the Kubernetes cluster

2021-08-10 Thread Khalid Mammadov
Hi Mich

I think you need to check your code.
If code does not use PySpark API effectively you may get this. I.e. if you
use pure Python/pandas api rather than Pyspark i.e.
transform->transform->action. e.g df.select(..).withColumn(...)...count()

Hope this helps to put you on right direction.

Cheers
Khalid




On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, 
wrote:

> Hi,
>
> I have a basic question to ask.
>
> I am running a Google k8s cluster (AKA GKE) with three nodes each having
> configuration below
>
> e2-standard-2 (2 vCPUs, 8 GB memory)
>
>
> spark-submit is launched from another node (actually a data proc single
> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call
> this the launch node
>
> OK I know that the cluster is not much but Google was complaining about
> the launch node hitting 100% cpus. So I added two more cpus to it.
>
> It appears that despite using k8s as the computational cluster, the burden
> falls upon the launch node!
>
> The cpu utilisation for launch node shown below
>
> [image: image.png]
> The dip is when 2 more cpus were added to  it so it had to reboot. so
> around %70 usage
>
> The combined cpu usage for GKE nodes is shown below:
>
> [image: image.png]
>
> Never goes above 20%!
>
> I can see that the drive and executors as below:
>
> k get pods -n spark
> NAME READY   STATUSRESTARTS
>  AGE
> pytest-c958c97b2c52b6ed-driver   1/1 Running   0
> 69s
> randomdatabigquery-e68a8a7b2c52f468-exec-1   1/1 Running   0
> 51s
> randomdatabigquery-e68a8a7b2c52f468-exec-2   1/1 Running   0
> 51s
> randomdatabigquery-e68a8a7b2c52f468-exec-3   0/1 Pending   0
> 51s
>
> It is a PySpark 3.1.1 image using java 8 and pushing random data generated
> into Google BigQuery data warehouse. The last executor (exec-3) seems to be
> just pending. The spark-submit is as below:
>
> spark-submit --verbose \
>--properties-file ${property_file} \
>--master k8s://https://$KUBERNETES_MASTER_IP:443 \
>--deploy-mode cluster \
>--name pytest \
>--conf
> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>--py-files $CODE_DIRECTORY/DSBQ.zip \
>--conf spark.kubernetes.namespace=$NAMESPACE \
>--conf spark.executor.memory=5000m \
>--conf spark.network.timeout=300 \
>--conf spark.executor.instances=3 \
>--conf spark.kubernetes.driver.limit.cores=1 \
>--conf spark.driver.cores=1 \
>--conf spark.executor.cores=1 \
>--conf spark.executor.memory=2000m \
>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>--conf spark.kubernetes.container.image=${IMAGEGCP} \
>--conf
> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>--conf
> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>--conf
> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
> \
>--conf spark.sql.execution.arrow.pyspark.enabled="true" \
>$CODE_DIRECTORY/${APPLICATION}
>
> Aren't the driver and executors running on K8s cluster? So why is the
> launch node heavily used but k8s cluster is underutilized?
>
> Thanks
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Installing Distributed apache spark cluster with Cluster mode on Docker

2021-07-25 Thread Khalid Mammadov
Hi Dinakar

If you are aim is to run Spark in “distributed mode” then all these cluster 
modes (excluding local) runs the cluster in distributed mode anyway.
As I said before the “deployment =cluster” mode is only for Driver application 
and executors are running on worker nodes in parallel mode (distributed).
This is how Spark works. So you can only choose where to run a “driver” 
application which defines what to do and waits for application execution and 
actual/most work is done in the worker nodes.
That in mind you can start (submit) you python code from local and target a 
cluster started in standalone mode (using doker for example) and get your 
distributed execution.

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000


Take a look below link and snippet from there:


https://spark.apache.org/docs/latest/submitting-applications.html



† A common deployment strategy is to submit your application from a gateway 
machine that is physically co-located with your worker machines (e.g. Master 
node in a standalone EC2 cluster). In this setup, client mode is appropriate. 
In client mode, the driver is launched directly within the spark-submit process 
which acts as a client to the cluster. The input and output of the application 
is attached to the console. Thus, this mode is especially suitable for 
applications that involve the REPL (e.g. Spark shell).

Alternatively, if your application is submitted from a machine far from the 
worker machines (e.g. locally on your laptop), it is common to use cluster mode 
to minimize network latency between the drivers and the executors. Currently, 
the standalone mode does not support cluster mode for Python applications.


Regarding Mesos and Yarn, I can’t comment on those as I don’t have experience 
with those modes. But I found this relevance for you ou: 
https://stackoverflow.com/questions/36461054/i-cant-seem-to-get-py-files-on-spark-to-work

Another suggestion is to keep CCing Spark user group email. So if I can’t 
answer then someone else may have. I am CCing and you can reply all.

Hope these all helps.

Regards
Khalid

Sent from my iPad

> On 25 Jul 2021, at 10:50, Dinakar Chennubotla  
> wrote:
> 
> Hi Khalid Mammadov,
> 
> I am now, reworking from scratch i.e. on How to build Distributed 
> apache-spark cluster, using yarn or apache mesos.
> 
> Sending you, my initial sketch. pictorial representation on the same.
> 
> Could you help me with the below:
> ==
> As per the Diagram,
> 1. I have to write Dockerfiles with Apache Hadoop with yarn and spark 
> (opensource only),
> How can I do, this 
> Your comments :
> 
> 2. To implement Apache Mesos with spark and deployment mode = cluster,
> if have any kind of documentation or weblinks or your knowledge, could you 
> give that to me.
> really I will help me a lot.
> your comments:
> 
> Thanks,
> dinakar
> 
> Thanks,
> Dinakar
> 
> On Sun, Jul 25, 2021 at 12:56 PM Khalid Mammadov  
> wrote:
>> Sorry Dinakat, unfortunately I dont have much availablety, but you cant drop 
>> me your questions and I would be happy to help as much as I can
>> 
>> On Sun, 25 Jul 2021, 04:17 Dinakar Chennubotla,  
>> wrote:
>>> Agenda:
>>> 1. Hoq to implementation of Apache mesos or apache Hadoop yarn, including 
>>> spark service with cluster mode.
>>> 2.  Exploration on dockering the above tools 
>>> 
>>> 
>>> Thanks,
>>> Dinakar
>>> 
>>> On Sun, 25 Jul, 2021, 08:43 Dinakar Chennubotla,  
>>> wrote:
>>>> Hi Khalid Mammadov,.
>>>> 
>>>> With all the mail discussion that we had till now, you got brief knowledge 
>>>> on my issue.
>>>> 
>>>> I would like to request we plan a zoom meeting and can complete this in 
>>>> not exceeding more than one or two sessions.
>>>> 
>>>> Kindly, let me know your availability and comments.
>>>> 
>>>> If not, we will continue our mail discussion.
>>>> 
>>>> Thanks,
>>>> Dinakar
>>>> 
>>>> On Sun, 25 Jul, 2021, 01:12 Khalid Mammadov,  
>>>> wrote:
>>>>> Had another look to your screen shot. It's also about Python, as this a 
>>>>> wrapper for java and cluster runs on java (JVM) it cant run python driver 
>>>>> inside. That means you can only run .jar files on cluster mode.
>>>>> 
>>>>> Hope all these make sense
>>>>> 
>>>>> On Sat, 24 Jul 2021, 19:58 Khalid Mammadov,  
>>>>> wrote:
>>>>>> From that link:
&

Re: Installing Distributed apache spark cluster with Cluster mode on Docker

2021-07-24 Thread Khalid Mammadov
Can you share your Dockerfile (not all but gist of it) and instructions how
you do it and what you actually run to get that message?

I have just pushed my local repo to Github where I have created an example
of Spark on Docker some time ago.
Please take a look and compare what you are doing.

https://github.com/khalidmammadov/spark_docker


On Sat, Jul 24, 2021 at 4:07 PM Dinakar Chennubotla <
chennu.bigd...@gmail.com> wrote:

> Hi Khalid Mammadov,
>
> I tried the which says distributed mode Spark installation. But when run
> below command it says " deployment mode = cluster is not allowed in
> standalone cluster".
>
> Source Url I used is:
>
> https://towardsdatascience.com/diy-apache-spark-docker-bb4f11c10d24?gi=fa52ac767c0b
>
> Kiddly refer this section in the url I mentioned.
> "Docker & Spark — Multiple Machines"
>
> I removed third party things and dockerized my way.
>
> Thanks,
> Dinakar
>
> On Sat, 24 Jul, 2021, 20:28 Khalid Mammadov, 
> wrote:
>
>> Standalone mode already implies you are running on cluster (distributed)
>> mode. i.e. it's one of 4 available cluster manager options. The difference
>> is Standalone uses it's one resource manager rather than using YARN for
>> example.
>> If you are running docker on a single machine then you are limited to
>> that but if you run your docker on a cluster and deploy your Spark
>> containers on it then you will get your distribution and cluster mode.
>> And also If you are referring to scalability then you need to register
>> worker nodes when you need to scale.
>> You do it by registering a VM/container as a worker node as per doc using:
>>
>> ./sbin/start-worker.sh 
>>
>> You can create a new docker container with your base image and run the above 
>> command on the bootstrap and that would register a worker node and scale 
>> your cluster when you want.
>>
>> And if you kill them then you would scale down ( I think this is how 
>> Databricks autoscaling works..). I am not sure k8s TBH, perhaps it's handled 
>> this more gracefully
>>
>>
>> On Sat, Jul 24, 2021 at 3:38 PM Dinakar Chennubotla <
>> chennu.bigd...@gmail.com> wrote:
>>
>>> Hi Khalid Mammadov,
>>>
>>> Thank you for your response,
>>> Yes, I did, I built standalone apache spark cluster on docker containers.
>>>
>>> But I am looking for distributed spark cluster,
>>> Where spark workers are scalable and spark "deployment mode  = cluster".
>>>
>>> Source url I used to built standalone apache spark cluster
>>> https://www.kdnuggets.com/2020/07/apache-spark-cluster-docker.html
>>>
>>> If you have documentation on distributed spark, which I am looking for,
>>> could you please send me.
>>>
>>>
>>> Thanks,
>>> Dinakar
>>>
>>> On Sat, 24 Jul, 2021, 19:32 Khalid Mammadov, 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Have you checked out docs?
>>>> https://spark.apache.org/docs/latest/spark-standalone.html
>>>>
>>>> Thanks,
>>>> Khalid
>>>>
>>>> On Sat, Jul 24, 2021 at 1:45 PM Dinakar Chennubotla <
>>>> chennu.bigd...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am Dinakar, Hadoop admin,
>>>>> could someone help me here,
>>>>>
>>>>> 1. I have a DEV-POC task to do,
>>>>> 2. Need to Installing Distributed apache-spark cluster with Cluster
>>>>> mode on Docker containers.
>>>>> 3. with Scalable spark-worker containers.
>>>>> 4. we have a 9 node cluster with some other services or tools.
>>>>>
>>>>> Thanks,
>>>>> Dinakar
>>>>>
>>>>


Re: Installing Distributed apache spark cluster with Cluster mode on Docker

2021-07-24 Thread Khalid Mammadov
Standalone mode already implies you are running on cluster (distributed)
mode. i.e. it's one of 4 available cluster manager options. The difference
is Standalone uses it's one resource manager rather than using YARN for
example.
If you are running docker on a single machine then you are limited to that
but if you run your docker on a cluster and deploy your Spark containers on
it then you will get your distribution and cluster mode.
And also If you are referring to scalability then you need to register
worker nodes when you need to scale.
You do it by registering a VM/container as a worker node as per doc using:

./sbin/start-worker.sh 

You can create a new docker container with your base image and run the
above command on the bootstrap and that would register a worker node
and scale your cluster when you want.

And if you kill them then you would scale down ( I think this is how
Databricks autoscaling works..). I am not sure k8s TBH, perhaps it's
handled this more gracefully


On Sat, Jul 24, 2021 at 3:38 PM Dinakar Chennubotla <
chennu.bigd...@gmail.com> wrote:

> Hi Khalid Mammadov,
>
> Thank you for your response,
> Yes, I did, I built standalone apache spark cluster on docker containers.
>
> But I am looking for distributed spark cluster,
> Where spark workers are scalable and spark "deployment mode  = cluster".
>
> Source url I used to built standalone apache spark cluster
> https://www.kdnuggets.com/2020/07/apache-spark-cluster-docker.html
>
> If you have documentation on distributed spark, which I am looking for,
> could you please send me.
>
>
> Thanks,
> Dinakar
>
> On Sat, 24 Jul, 2021, 19:32 Khalid Mammadov, 
> wrote:
>
>> Hi,
>>
>> Have you checked out docs?
>> https://spark.apache.org/docs/latest/spark-standalone.html
>>
>> Thanks,
>> Khalid
>>
>> On Sat, Jul 24, 2021 at 1:45 PM Dinakar Chennubotla <
>> chennu.bigd...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am Dinakar, Hadoop admin,
>>> could someone help me here,
>>>
>>> 1. I have a DEV-POC task to do,
>>> 2. Need to Installing Distributed apache-spark cluster with Cluster mode
>>> on Docker containers.
>>> 3. with Scalable spark-worker containers.
>>> 4. we have a 9 node cluster with some other services or tools.
>>>
>>> Thanks,
>>> Dinakar
>>>
>>


Re: Installing Distributed apache spark cluster with Cluster mode on Docker

2021-07-24 Thread Khalid Mammadov
Hi,

Have you checked out docs?
https://spark.apache.org/docs/latest/spark-standalone.html

Thanks,
Khalid

On Sat, Jul 24, 2021 at 1:45 PM Dinakar Chennubotla <
chennu.bigd...@gmail.com> wrote:

> Hi All,
>
> I am Dinakar, Hadoop admin,
> could someone help me here,
>
> 1. I have a DEV-POC task to do,
> 2. Need to Installing Distributed apache-spark cluster with Cluster mode
> on Docker containers.
> 3. with Scalable spark-worker containers.
> 4. we have a 9 node cluster with some other services or tools.
>
> Thanks,
> Dinakar
>


Re: Missing stack function from SQL functions API

2021-06-15 Thread Khalid Mammadov
Hi David

If you need alternative way to do it you can use below:
df.select(expr("stack(2, 1,2,3)"))
Or
df.withColumn('stacked', expr("stack(2, 1,2,3)"))

Thanks
Khalid

On Mon, 14 Jun 2021, 10:14 ,  wrote:

> I noticed that the stack SQL function
>  is missing from the 
> functions
> API
> .
> Could we add it?
>


Re: convert java dataframe to pyspark dataframe

2021-03-31 Thread Khalid Mammadov
I think what you want to achieve is what PySpark is actually doing in 
it's API under the hood.


So, specifically you need to look at PySpark's implementation of 
DataFrame, SparkSession and SparkContext API. Under the hood that what 
is happening, it start a py4j gateway and delegates all Spark operations 
and objects creation to JVM.


Look for example here 
<https://github.com/apache/spark/blob/48ef9bd2b3a0cb52aaa31a3fada8779b7a7b9132/python/pyspark/context.py#L209>, 
here 
<https://github.com/apache/spark/blob/48ef9bd2b3a0cb52aaa31a3fada8779b7a7b9132/python/pyspark/sql/session.py#L248> 
and here 
<https://github.com/apache/spark/blob/48ef9bd2b3a0cb52aaa31a3fada8779b7a7b9132/python/pyspark/java_gateway.py#L153>where 
SparkSession/SparkContext (Python) communicates with JVM and creates 
SparkSession/SparkContext on JVM side. And rest of the PySpark code will 
be delegating execution to them.


Having these objects created by you and custom Java/Scala application 
which holds Spark objects then you can play around with rest of 
DataFrame creation and passing back and forward. But, I must admit, this 
is not part of official documentation and playing around internals of 
Spark and which are subject to change (often).


So, I am not sure what your actual requirement is but you will need to 
implement your custom version of PySpark API to get all functionality 
you need and control on JVM side.



On 31/03/2021 06:49, Aditya Singh wrote:

Thanks a lot Khalid for replying.

I have one question though. The approach tou showed needs an 
understanding on python side before hand about the data type of 
columns of dataframe. Can we implement a generic approach where this 
info is not required and we just have the java dataframe as input on 
python side?


Also one more question, in my use case I will sending a dataframe from 
java to python and then on python side there will be some 
transformation done on the dataframe(including using python udfs) but 
no actions will be performed here and then will send it back to java 
where actions will be performed. So also wanted to ask if this is 
feasible and if yes do we need to send some special jars to executors 
so that it can execute udfs on the dataframe.


On Wed, 31 Mar 2021 at 3:37 AM, Khalid Mammadov 
mailto:khalidmammad...@gmail.com>> wrote:


Hi Aditya,


I think you original question was as how to convert a DataFrame
from Spark session created on Java/Scala to a DataFrame on a Spark
session created from Python(PySpark).

So, as I have answered on your SO question:


There is a missing call to *entry_point* before calling getDf() in
your code

So, try this :

|app = gateway.entry_point j_df = app.getDf() |

Additionally, I have create working copy using Python and Scala
(hope you dont mind) below that shows how on Scala side py4j
gateway is started with Spark session and a sample DataFrame and
on Python side I have accessed that DataFrame and converted to
Python List[Tuple] before converting back to a DataFrame for a
Spark session on Python side:

*Python:*

|from py4j.java_gateway import JavaGateway from pyspark.sql import
SparkSession from pyspark.sql.types import StructType,
IntegerType, StructField if __name__ == '__main__': gateway =
JavaGateway() spark_app = gateway.entry_point df = spark_app.df()
# Note "apply" method here comes from Scala's companion object to
access elements of an array df_to_list_tuple = [(int(i.apply(0)),
int(i.apply(1))) for i in df] spark = (SparkSession .builder
.appName("My PySpark App") .getOrCreate()) schema = StructType([
StructField("a", IntegerType(), True), StructField("b",
IntegerType(), True)]) df =
spark.createDataFrame(df_to_list_tuple, schema) df.show() |

*Scala:*

|import java.nio.file.{Path, Paths} import
org.apache.spark.sql.SparkSession import py4j.GatewayServer object
SparkApp { val myFile: Path =
Paths.get(System.getProperty("user.home") +
"/dev/sample_data/games.csv") val spark = SparkSession.builder()
.master("local[*]") .appName("My app") .getOrCreate() val df =
spark .read .option("header", "True") .csv(myFile.toString)
.collect() } object Py4JServerApp extends App { val server = new
GatewayServer(SparkApp) server.start() print("Started and
running...") } |


Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:

Hi Sean,

Thanks a lot for replying and apologies for the late reply(I
somehow missed this mail before) but I am under the impression
that passing the py4j.java_gateway.JavaGateway object lets the
pyspark access the spark context created on the java side.
My use case is exactly what you mentioned in the last email. I
want to access the same spark session across java a

Re: convert java dataframe to pyspark dataframe

2021-03-30 Thread Khalid Mammadov

Hi Aditya,


I think you original question was as how to convert a DataFrame from 
Spark session created on Java/Scala to a DataFrame on a Spark session 
created from Python(PySpark).


So, as I have answered on your SO question:


There is a missing call to *entry_point* before calling getDf() in your code

So, try this :

|app = gateway.entry_point j_df = app.getDf() |

Additionally, I have create working copy using Python and Scala (hope 
you dont mind) below that shows how on Scala side py4j gateway is 
started with Spark session and a sample DataFrame and on Python side I 
have accessed that DataFrame and converted to Python List[Tuple] before 
converting back to a DataFrame for a Spark session on Python side:


*Python:*

|from py4j.java_gateway import JavaGateway from pyspark.sql import 
SparkSession from pyspark.sql.types import StructType, IntegerType, 
StructField if __name__ == '__main__': gateway = JavaGateway() spark_app 
= gateway.entry_point df = spark_app.df() # Note "apply" method here 
comes from Scala's companion object to access elements of an array 
df_to_list_tuple = [(int(i.apply(0)), int(i.apply(1))) for i in df] 
spark = (SparkSession .builder .appName("My PySpark App") 
.getOrCreate()) schema = StructType([ StructField("a", IntegerType(), 
True), StructField("b", IntegerType(), True)]) df = 
spark.createDataFrame(df_to_list_tuple, schema) df.show() |


*Scala:*

|import java.nio.file.{Path, Paths} import 
org.apache.spark.sql.SparkSession import py4j.GatewayServer object 
SparkApp { val myFile: Path = Paths.get(System.getProperty("user.home") 
+ "/dev/sample_data/games.csv") val spark = SparkSession.builder() 
.master("local[*]") .appName("My app") .getOrCreate() val df = spark 
.read .option("header", "True") .csv(myFile.toString) .collect() } 
object Py4JServerApp extends App { val server = new 
GatewayServer(SparkApp) server.start() print("Started and running...") } |



Regards,
Khalid


On 30/03/2021 07:57, Aditya Singh wrote:

Hi Sean,

Thanks a lot for replying and apologies for the late reply(I somehow 
missed this mail before) but I am under the impression that passing 
the py4j.java_gateway.JavaGateway object lets the pyspark access the 
spark context created on the java side.
My use case is exactly what you mentioned in the last email. I want to 
access the same spark session across java and pyspark. So how can we 
share the spark context and in turn spark session, across java and 
pyspark.


Regards,
Aditya

On Fri, 26 Mar 2021 at 6:49 PM, Sean Owen > wrote:


The problem is that both of these are not sharing a SparkContext
as far as I can see, so there is no way to share the object across
them, let alone languages.

You can of course write the data from Java, read it from Python.

In some hosted Spark products, you can access the same session
from two languages and register the DataFrame as a temp view in
Java, then access it in Pyspark.


On Fri, Mar 26, 2021 at 8:14 AM Aditya Singh
mailto:aditya.singh9...@gmail.com>>
wrote:

Hi All,

I am a newbie to spark and trying to pass a java dataframe to
pyspark. Foloowing link has details about what I am trying to do:-


https://stackoverflow.com/questions/66797382/creating-pysparks-spark-context-py4j-java-gateway-object



Can someone please help me with this?

Thanks,



Re: Spark SQL Dataset and BigDecimal

2021-02-18 Thread Khalid Mammadov
As Scala book says Value types are mapped/transformed to java primitive types. 
So when you use Integer for example it will compile to int. So Integer is a 
syntactic sugar and makes it more readable in Scala code than plain int and 
plus Scala adds extra perks through implicits etc. I think the same goes for 
this BigDecimal case. So I personally would go with Scala types and compiler 
should do the rest for you.

Cheers,
Khalid Mammadov

> On 18 Feb 2021, at 09:39, Ivan Petrov  wrote:
> 
> 
> I'm fine with both. So does it make sense to use java.math.BigDecimal 
> everywhere to avoid perf penalty for value conversion? 
> scala BigMath looks like a wrapper around java.math.BigDecimal though...
> 
> чт, 18 февр. 2021 г. в 00:33, Takeshi Yamamuro :
>> Yea, I think that's because it's needed for interoperability between 
>> scala/java.
>> If it returns a scala decimal, java code cannot handle it.
>> 
>> If you want a scala decimal, you need to convert it by yourself.
>> 
>> Bests,
>> Takeshi
>> 
>>> On Wed, Feb 17, 2021 at 9:48 PM Ivan Petrov  wrote:
>>> Hi, I'm using Spark Scala Dataset API to write spark sql jobs.
>>> I've noticed that Spark dataset accepts scala BigDecimal as the value but 
>>> it always returns java.math.BigDecimal when you read it back.
>>> 
>>> Is it by design?
>>> Should I use java.math.BigDecimal everywhere instead?
>>> Is there any performance penalty for  using scala BigDecimal? it's more 
>>> convenient from an API point of view than java.math.BigDecimal.
>> 
>> 
>> -- 
>> ---
>> Takeshi Yamamuro


Re: Spark SQL check timestamp with other table and update a column.

2020-11-21 Thread Khalid Mammadov

Hi,

I am not sure if you were writing pseudo-code or real one but there were 
few issues in the sql.


I have reproduced you example in the Spark REPL and all worked as 
expected and result is the one you need


Please see below full code:

## *Spark 3.0.0*

>>> a = spark.read.csv("tab1", sep="|", header=True, schema="col1 
TIMESTAMP, col2 INT, col3 STRING")

>>> a.printSchema
int, col3: string]>


>>> a.show()
++++
|    col1|col2|col3|
++++
|2020-11-17 20:50:...|   1|null|
++++

>>> b = spark.read.csv("tab2", sep="|", header=True, schema="col1 
TIMESTAMP, col2 INT, col3 STRING")

>>> b.printSchema
int, col3: string]>


>>> b.show()
++++
|    col1|col2|col3|
++++
|2020-11-17 21:19:...|   1| win|
|2020-11-17 20:49:...|   1| win|
|2020-11-17 20:19:...|   1| Win|
++++

>>> a.createOrReplaceTempView("table1")
>>> b.createOrReplaceTempView("table2")

>>> res = spark.sql("""
... select a.col1,a.col2, b.col1, b.col2, coalesce(a.col3,b.col3) as col3
... from table1 a *join* table2 b
... on (a.col2=b.col2) and (*a.col1 < b.col1*)
... """)
>>> res.show()
++++++
|    col1|col2|    col1|col2|col3|
++++++
|2020-11-17T20:50:...|   1|2020-11-17T21:19:...|   1| win|
++++++

Regards,

Khalid

On 19/11/2020 05:13, anbutech wrote:

select a.col1,a.col2.coalesce(a.col3,b.col3) as col3
from table1 a left table2 b
on (a.col2=b.col2) and (b.col1 < b.col1)


Re: mission statement : unified

2020-10-25 Thread Khalid Mammadov
Correct. Also as explained in the book LearningSpark2.0 by Databiricks:

Unified Analytics
While the notion of unification is not unique to Spark, it is a core component 
of its design philosophy and evolution. In November 2016, the Association for 
Computing Machinery (ACM) recognized Apache Spark and conferred upon its 
original creators the prestigious ACM Award for their paper describing Apache 
Spark as a “Unified Engine for Big Data Processing.” The award-winning paper 
notes that Spark replaces all the separate batch processing, graph, stream, and 
query engines like Storm, Impala, Dremel, Pregel, etc. with a unified stack of 
components that addresses diverse workloads under a single distributed fast 
engine.

Khalid

> On 19 Oct 2020, at 07:03, Sonal Goyal  wrote:
> 
> 
> My thought is that Spark supports analytics for structured and unstructured 
> data, batch as well as real time. This was pretty revolutionary when Spark 
> first came out. That's where the unified term came from I think. Even after 
> all these years, Spark remains the trusted framework for enterprise 
> analytics. 
> 
>> On Mon, 19 Oct 2020, 11:24 Gourav Sengupta > Hi,
>> 
>> I think that it is just a marketing statement. But with SPARK 3.x, now that 
>> you are seeing that SPARK is no more than just another distributed data 
>> processing engine, they are trying to join data pre-processing into ML 
>> pipelines directly. I may call that unified. 
>> 
>> But you get the same with several other frameworks as well now so not quite 
>> sure how unified creates a unique brand value.
>> 
>> 
>> Regards,
>> Gourav Sengupta 
>> 
>>> On Sun, Oct 18, 2020 at 6:40 PM Hulio andres  wrote:
>>>  
>>> Apache Spark's  mission statement is  Apache Spark™ is a unified analytics 
>>> engine for large-scale data processing. 
>>>  
>>> To what is the word "unified" inferring ?
>>>  
>>>  
>>>  
>>>  
>>>  
>>>  
>>> - To 
>>> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Let multiple jobs share one rdd?

2020-09-24 Thread Khalid Mammadov

Perhaps you can use Global Temp Views?

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createGlobalTempView


On 24/09/2020 14:52, Gang Li wrote:

Hi all,

There are three jobs, among which the first rdd is the same. Can the first
rdd be calculated once, and then the subsequent operations will be
calculated in parallel?



My code is as follows:

sqls = ["""
 INSERT OVERWRITE TABLE `spark_input_test3` PARTITION
(dt='20200917')
 SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey
 FROM temp_table where status=3""",
 """
 INSERT OVERWRITE TABLE `spark_input_test4` PARTITION
(dt='20200917')
 SELECT id, cur_inst_id, status, update_time, schedule_time,
task_name
 FROM temp_table where schedule_time > '2020-09-01 00:00:00'
 """]

def multi_thread():
 sql = """SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey, task_name, update_time, schedule_time, cur_inst_id,
scheduler_id
 FROM table
 where dt < '20200801'"""
 data = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
 threads = []
 for i in range(2):
 try:
 t = threading.Thread(target=insert_overwrite_thread,
args=(sqls[i], data, ))
 t.start()
 threads.append(t)
 except Exception as x:
 print x
 for t in threads:
 t.join()

def insert_overwrite_thread(sql, data):
 data.createOrReplaceTempView('temp_table')
 spark.sql(sql)



Since spark is in lazy mode, the first RDD will still be calculated multiple
times during parallel submission.
I would like to ask you if there are other ways, thanks!

Cheers,
Gang Li



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org