Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
I think this answers your question about what to do if you need more space
on nodes.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage

Local Storage
<https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage>

Spark supports using volumes to spill data during shuffles and other
operations. To use a volume as local storage, the volume’s name should
starts with spark-local-dir-, for example:

--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=
--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false

Specifically, you can use persistent volume claims if the jobs require
large shuffle and sorting operations in executors.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=500Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false

To enable shuffle data recovery feature via the built-in
KubernetesLocalDiskShuffleDataIO plugin, we need to have the followings.
You may want to enable
spark.kubernetes.driver.waitToReusePersistentVolumeClaim additionally.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO

If no volume is set as local storage, Spark uses temporary scratch space to
spill data to disk during shuffles and other operations. When using
Kubernetes as the resource manager the pods will be created with an emptyDir
<https://kubernetes.io/docs/concepts/storage/volumes/#emptydir> volume
mounted for each directory listed in spark.local.dir or the environment
variable SPARK_LOCAL_DIRS . If no directories are explicitly specified then
a default directory is created and configured appropriately.

emptyDir volumes use the ephemeral storage feature of Kubernetes and do not
persist beyond the life of the pod.

tor. 11. apr. 2024 kl. 10:29 skrev Bjørn Jørgensen :

> " In the end for my usecase I started using pvcs and pvc aware scheduling
> along with decommissioning. So far performance is good with this choice."
> How did you do this?
>
>
> tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :
>
>> Hi Everyone,
>>
>> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
>> had also explored AWS FSX lustre in few of my production jobs which has
>> ~20TB of shuffle operations with 200-300 executors. What I have observed is
>> S3 and fax behaviour was fine during the write phase, however I faced iops
>> throttling during the read phase(read taking forever to complete). I think
>> this might be contributed by the heavy use of shuffle index file (I didn't
>> perform any extensive research on this), so I believe the shuffle manager
>> logic have to be intelligent enough to reduce the fetching of files from
>> object store. In the end for my usecase I started using pvcs and pvc aware
>> scheduling along with decommissioning. So far performance is good with this
>> choice.
>>
>> Thank you
>>
>> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
>> wrote:
>>
>>> Hi,
>>>
>>> First thanks everyone for their contributions
>>>
>>> I was going to reply to @Enrico Minack   but
>>> noticed additional info. As I understand for example,  Apache Uniffle is an
>>> incubating project aimed at providing a pluggable shuffle service for
>>> Spark. So basically, all these "external shuffle services" have in common
>>> is to offload shuffle data management to external services, thus reducing
>>> the memory and CPU overhead on Spark executors. That is great.  While
>>> Uniffle and others enhance shuffle performance and scalability, it would be
>>> great to integrate them with Spark UI. This may require additional
>>> development efforts. I suppose  the interest would be to have these
>>> external matrices incorporated into Spark with one look and feel. This may
>>> require customizing the UI to fetch and display metrics or statistics from
>>> the external shuffle services. Has any project done this?
>>>
>>> Thanks
>>>
>>> Mich Talebzadeh,
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>>

Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
>>> "service_account_name",
>>>>> "spark.hadoop.fs.gs.impl":
>>>>> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
>>>>> "spark.hadoop.google.cloud.auth.service.account.enable": "true",
>>>>> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
>>>>> "/path/to/keyfile.json",
>>>>> }
>>>>>
>>>>> For Amazon S3 similar
>>>>>
>>>>> spark_config_s3 = {
>>>>> "spark.kubernetes.authenticate.driver.serviceAccountName":
>>>>> "service_account_name",
>>>>> "spark.hadoop.fs.s3a.impl":
>>>>> "org.apache.hadoop.fs.s3a.S3AFileSystem",
>>>>> "spark.hadoop.fs.s3a.access.key": "s3_access_key",
>>>>> "spark.hadoop.fs.s3a.secret.key": "secret_key",
>>>>> }
>>>>>
>>>>>
>>>>> To implement these configurations and enable Spark applications to
>>>>> interact with GCS and S3, I guess we can approach it this way
>>>>>
>>>>> 1) Spark Repository Integration: These configurations need to be added
>>>>> to the Spark repository as part of the supported configuration options for
>>>>> k8s deployments.
>>>>>
>>>>> 2) Configuration Settings: Users need to specify these configurations
>>>>> when submitting Spark applications to a Kubernetes cluster. They can
>>>>> include these configurations in the Spark application code or pass them as
>>>>> command-line arguments or environment variables during application
>>>>> submission.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>>
>>>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>>
>>>>> On Sun, 7 Apr 2024 at 13:31, Vakaris Baškirov <
>>>>> vakaris.bashki...@gmail.com> wrote:
>>>>>
>>>>>> There is an IBM shuffle service plugin that supports S3
>>>>>> https://github.com/IBM/spark-s3-shuffle
>>>>>>
>>>>>> Though I would think a feature like this could be a part of the main
>>>>>> Spark repo. Trino already has out-of-box support for s3 exchange 
>>>>>> (shuffle)
>>>>>> and it's very useful.
>>>>>>
>>>>>> Vakaris
>>>>>>
>>>>>> On Sun, Apr 7, 2024 at 12:27 PM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Thanks for your suggestion that I take it as a workaround. Whilst
>>>>>>> this workaround can potentially address storage allocation issues, I was
>>>>>>> more interested in exploring solutions that offer a more seamless
>>>>>>> integration with large distributed file systems like HDFS, GCS, or S3. 
>>>>>>> This
>>>>>>> would ensure better performance and scalability for handling larger
>>>>>>> datasets efficiently.
>>>>>>>
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>&g

Re: External Spark shuffle service for k8s

2024-04-06 Thread Bjørn Jørgensen
You can make a PVC on K8S call it 300GB

make a folder in yours dockerfile
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir

start spark with adding this

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.options.claimName",
"300gb") \

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.path",
"/opt/spark/work-dir") \

.config("spark.kubernetes.driver.volumes.persistentVolumeClaim.300gb.mount.readOnly",
"False") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.options.claimName",
"300gb") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.path",
"/opt/spark/work-dir") \

.config("spark.kubernetes.executor.volumes.persistentVolumeClaim.300gb.mount.readOnly",
"False") \
  .config("spark.local.dir", "/opt/spark/work-dir")




lør. 6. apr. 2024 kl. 15:45 skrev Mich Talebzadeh :

> I have seen some older references for shuffle service for k8s,
> although it is not clear they are talking about a generic shuffle
> service for k8s.
>
> Anyhow with the advent of genai and the need to allow for a larger
> volume of data, I was wondering if there has been any more work on
> this matter. Specifically larger and scalable file systems like HDFS,
> GCS , S3 etc, offer significantly larger storage capacity than local
> disks on individual worker nodes in a k8s cluster, thus allowing
> handling much larger datasets more efficiently. Also the degree of
> parallelism and fault tolerance  with these files systems come into
> it. I will be interested in hearing more about any progress on this.
>
> Thanks
> .
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [External] Re: Issue of spark with antlr version

2024-04-06 Thread Bjørn Jørgensen
[[VOTE] Release Plan for Apache Spark 4.0.0 (June 2024)](
https://lists.apache.org/thread/r0zn6rd8y25yn2dg59ktw3ttrwxzqrfb)

Apache Spark 4.0.0 Release Plan
===

1. After creating `branch-3.5`, set "4.0.0-SNAPSHOT" in master branch.

2. Creating `branch-4.0` on April 1st, 2024.

3. Apache Spark 4.0.0 RC1 on May 1st, 2024.

4. Apache Spark 4.0.0 Release in June, 2024.

tir. 2. apr. 2024 kl. 12:06 skrev Chawla, Parul :

> ++ Ashima
>
> --
> *From:* Chawla, Parul 
> *Sent:* Tuesday, April 2, 2024 9:56 AM
> *To:* Bjørn Jørgensen ; user@spark.apache.org <
> user@spark.apache.org>
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> Hi Team,
> Any update on below query :when spark 4.x will be released to maven as on
> maven site i could see spark core 3.5.1 .
>
> Regards,
> Parul
>
> ------
> *From:* Chawla, Parul 
> *Sent:* Monday, April 1, 2024 4:20 PM
> *To:* Bjørn Jørgensen 
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> Hi Team,
>
> Can you let us know the when   this spark 4.x will be released to maven.
>
> regards,
> Parul
>
> Get Outlook for iOS <https://aka.ms/o0ukef>
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Wednesday, February 28, 2024 5:06:54 PM
> *To:* Chawla, Parul 
> *Cc:* Sahni, Ashima ;
> user@spark.apache.org ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* Re: [External] Re: Issue of spark with antlr version
>
> [image: image.png]
>
> ons. 28. feb. 2024 kl. 11:28 skrev Chawla, Parul <
> parul.cha...@accenture.com>:
>
>
> Hi ,
> Can we get spark version on whuich this is resolved.
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Tuesday, February 27, 2024 7:05:36 PM
> *To:* Sahni, Ashima 
> *Cc:* Chawla, Parul ; user@spark.apache.org <
> user@spark.apache.org>; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* [External] Re: Issue of spark with antlr version
>
> *CAUTION:* External email. Be cautious with links and attachments.
> [SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1
> <https://urldefense.com/v3/__https://github.com/apache/spark/pull/43075__;!!OrxsNty6D4my!5V_Vn4ayIAu7jPmZwkyA7qS7Pjmrg0Tp6jfeEYeA5p1GnG7Ks5v6nFfsCjZeHULghYge8_xl1to4vzFhly3d_figlCZ9-w$>
>
>
> tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
> :
>
> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our sy

Re: A proposal for creating a Knowledge Sharing Hub for Apache Spark Community

2024-03-18 Thread Bjørn Jørgensen
something like this  Spark community · GitHub
<https://github.com/Spark-community>


man. 18. mars 2024 kl. 17:26 skrev Parsian, Mahmoud
:

> Good idea. Will be useful
>
>
>
> +1
>
>
>
>
>
>
>
> *From: *ashok34...@yahoo.com.INVALID 
> *Date: *Monday, March 18, 2024 at 6:36 AM
> *To: *user @spark , Spark dev list <
> d...@spark.apache.org>, Mich Talebzadeh 
> *Cc: *Matei Zaharia 
> *Subject: *Re: A proposal for creating a Knowledge Sharing Hub for Apache
> Spark Community
>
> External message, be mindful when clicking links or attachments
>
>
>
> Good idea. Will be useful
>
>
>
> +1
>
>
>
> On Monday, 18 March 2024 at 11:00:40 GMT, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
>
>
> Some of you may be aware that Databricks community Home | Databricks
>
> have just launched a knowledge sharing hub. I thought it would be a
>
> good idea for the Apache Spark user group to have the same, especially
>
> for repeat questions on Spark core, Spark SQL, Spark Structured
>
> Streaming, Spark Mlib and so forth.
>
>
>
> Apache Spark user and dev groups have been around for a good while.
>
> They are serving their purpose . We went through creating a slack
>
> community that managed to create more more heat than light.. This is
>
> what Databricks community came up with and I quote
>
>
>
> "Knowledge Sharing Hub
>
> Dive into a collaborative space where members like YOU can exchange
>
> knowledge, tips, and best practices. Join the conversation today and
>
> unlock a wealth of collective wisdom to enhance your experience and
>
> drive success."
>
>
>
> I don't know the logistics of setting it up.but I am sure that should
>
> not be that difficult. If anyone is supportive of this proposal, let
>
> the usual +1, 0, -1 decide
>
>
>
> HTH
>
>
>
> Mich Talebzadeh,
>
> Dad | Technologist | Solutions Architect | Engineer
>
> London
>
> United Kingdom
>
>
>
>
>
>   view my Linkedin profile
>
>
>
>
>
> https://en.everybodywiki.com/Mich_Talebzadeh
> <https://urldefense.com/v3/__https:/en.everybodywiki.com/Mich_Talebzadeh__;!!HrbR-XT-OQ!Wu9fFP8RFJW2N_YUvwl9yctGHxtM-CFPe6McqOJDrxGBjIaRoF8vRwpjT9WzHojwI2R09Nbg8YE9ggB4FtocU8cQFw$>
>
>
>
>
>
>
>
> Disclaimer: The information provided is correct to the best of my
>
> knowledge but of course cannot be guaranteed . It is essential to note
>
> that, as with any advice, quote "one test result is worth one-thousand
>
> expert opinions (Werner Von Braun)".
>
>
>
> -
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [External] Re: Issue of spark with antlr version

2024-02-28 Thread Bjørn Jørgensen
[image: image.png]

ons. 28. feb. 2024 kl. 11:28 skrev Chawla, Parul :

>
> Hi ,
> Can we get spark version on whuich this is resolved.
> --
> *From:* Bjørn Jørgensen 
> *Sent:* Tuesday, February 27, 2024 7:05:36 PM
> *To:* Sahni, Ashima 
> *Cc:* Chawla, Parul ; user@spark.apache.org <
> user@spark.apache.org>; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>; Mekala, Rajesh <
> r.mek...@accenture.com>; Grandhi, Venkatesh <
> venkatesh.a.gran...@accenture.com>; George, Rejish <
> rejish.geo...@accenture.com>; Tayal, Aayushi 
> *Subject:* [External] Re: Issue of spark with antlr version
>
> *CAUTION:* External email. Be cautious with links and attachments.
> [SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1
> <https://urldefense.com/v3/__https://github.com/apache/spark/pull/43075__;!!OrxsNty6D4my!5V_Vn4ayIAu7jPmZwkyA7qS7Pjmrg0Tp6jfeEYeA5p1GnG7Ks5v6nFfsCjZeHULghYge8_xl1to4vzFhly3d_figlCZ9-w$>
>
>
> tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
> :
>
> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Issue of spark with antlr version

2024-02-27 Thread Bjørn Jørgensen
[SPARK-44366][BUILD] Upgrade antlr4 to 4.13.1
<https://github.com/apache/spark/pull/43075>


tir. 27. feb. 2024 kl. 13:25 skrev Sahni, Ashima
:

> Hi Team,
>
>
>
> Can you please let us know the update on below.
>
>
>
> Thanks,
>
> Ashima
>
>
>
> *From:* Chawla, Parul 
> *Sent:* Sunday, February 25, 2024 11:57 PM
> *To:* user@spark.apache.org
> *Cc:* Sahni, Ashima ; Misra Parashar, Jyoti <
> jyoti.misra.paras...@accenture.com>
> *Subject:* Issue of spark with antlr version
>
>
>
> Hi Spark Team,
>
>
>
>
>
> Our application is currently using spring framrwork 5.3.31 .To upgrade it
> to 6.x , as per application dependency we must upgrade Spark and
> Hibernate jars as well .
>
> With Hibernate compatible upgrade, the dependent Antlr4 jar version has
> been upgraded to 4.10.1 but there’s no Spark version available with the
> upgraded Antlr4 jar.
>
> Can u please update when we can have updated version with upgraded antl4
> version..
>
>
>
>
>
> Regards,
>
> Parul
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security, AI-powered support
> capabilities, and assessment of internal compliance with Accenture policy.
> Your privacy is important to us. Accenture uses your personal data only in
> compliance with data protection laws. For further information on how
> Accenture processes your personal data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Bjørn Jørgensen
roBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>>  at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>  at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>  at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>  at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>>  at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>  at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>>  at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>>  at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>  at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>>  at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
>>
>>
>> Dataset df =
>> spark
>> .readStream()
>> .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
>> .options(appConfig.getKafka().getConf())
>> .load()
>> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>> .option("checkpointLocation", appConfig.getChk().getPath())
>> .start()
>> .awaitTermination();
>>
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Okio Vulnerability in Spark 3.4.1

2024-01-11 Thread Bjørn Jørgensen
[SPARK-46662][K8S][BUILD] Upgrade kubernetes-client to 6.10.0
<https://github.com/apache/spark/pull/44672> a new version of
kubernets-client with okio version 1.17.6 is now merged to master and will
be in the spark 4.0 version.

tir. 14. nov. 2023 kl. 15:21 skrev Bjørn Jørgensen :

> FYI
> I have opened Update okio to version 1.17.6
> <https://github.com/fabric8io/kubernetes-client/pull/5587> for this now.
>
> tor. 31. aug. 2023 kl. 21:18 skrev Sean Owen :
>
>> It's a dependency of some other HTTP library. Use mvn dependency:tree to
>> see where it comes from. It may be more straightforward to upgrade the
>> library that brings it in, assuming a later version brings in a later okio.
>> You can also manage up the version directly with a new entry in
>> 
>>
>> However, does this affect Spark? all else equal it doesn't hurt to
>> upgrade, but wondering if there is even a theory that it needs to be
>> updated.
>>
>>
>> On Thu, Aug 31, 2023 at 7:42 AM Agrawal, Sanket <
>> sankeagra...@deloitte.com> wrote:
>>
>>> I don’t see an entry in pom.xml while building spark. I think it is
>>> being downloaded as part of some other dependency.
>>>
>>>
>>>
>>> *From:* Sean Owen 
>>> *Sent:* Thursday, August 31, 2023 5:10 PM
>>> *To:* Agrawal, Sanket 
>>> *Cc:* user@spark.apache.org
>>> *Subject:* [EXT] Re: Okio Vulnerability in Spark 3.4.1
>>>
>>>
>>>
>>> Does the vulnerability affect Spark?
>>>
>>> In any event, have you tried updating Okio in the Spark build? I don't
>>> believe you could just replace the JAR, as other libraries probably rely on
>>> it and compiled against the current version.
>>>
>>>
>>>
>>> On Thu, Aug 31, 2023 at 6:02 AM Agrawal, Sanket <
>>> sankeagra...@deloitte.com.invalid> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> Amazon inspector has detected a vulnerability in okio-1.15.0.jar JAR in
>>> Spark 3.4.1. It suggests to upgrade the jar version to 3.4.0. But when we
>>> try this version of jar then the spark application is failing with below
>>> error:
>>>
>>>
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> None.org.apache.spark.api.java.JavaSparkContext.
>>>
>>> : java.lang.NoClassDefFoundError: okio/BufferedSource
>>>
>>> at okhttp3.internal.Util.(Util.java:62)
>>>
>>> at okhttp3.OkHttpClient.(OkHttpClient.java:127)
>>>
>>> at okhttp3.OkHttpClient$Builder.(OkHttpClient.java:475)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newOkHttpClientBuilder(OkHttpClientFactory.java:41)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:56)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:68)
>>>
>>> at
>>> io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory.newBuilder(OkHttpClientFactory.java:30)
>>>
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientBuilder.getHttpClient(KubernetesClientBuilder.java:88)
>>>
>>> at
>>> io.fabric8.kubernetes.client.KubernetesClientBuilder.build(KubernetesClientBuilder.java:78)
>>>
>>> at
>>> org.apache.spark.deploy.k8s.SparkKubernetesClientFactory$.createKubernetesClient(SparkKubernetesClientFactory.scala:120)
>>>
>>> at
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:111)
>>>
>>> at
>>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:3037)
>>>
>>> at org.apache.spark.SparkContext.(SparkContext.scala:568)
>>>
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
>>>
>>> at
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>
>>> at
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
>>> Source)
>>>
>>> at
>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
>>> Source)
>>

Re: Validate spark sql

2023-12-25 Thread Bjørn Jørgensen
c errors
>>>- If you need more comprehensive validation, consider using a
>>>testing framework and a small dataset.
>>>- For complex queries, using a linter or code analysis tool can help
>>>    identify potential issues.
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 24 Dec 2023 at 07:57, ram manickam  wrote:
>>>
>>>> Hello,
>>>> Is there a way to validate pyspark sql to validate only syntax errors?.
>>>> I cannot connect do actual data set to perform this validation.  Any
>>>> help would be appreciated.
>>>>
>>>>
>>>> Thanks
>>>> Ram
>>>>
>>>
>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Validate spark sql

2023-12-25 Thread Bjørn Jørgensen
r code analysis tool can help
>>identify potential issues.
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 24 Dec 2023 at 07:57, ram manickam  wrote:
>>
>>> Hello,
>>> Is there a way to validate pyspark sql to validate only syntax errors?.
>>> I cannot connect do actual data set to perform this validation.  Any
>>> help would be appreciated.
>>>
>>>
>>> Thanks
>>> Ram
>>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Okio Vulnerability in Spark 3.4.1

2023-11-14 Thread Bjørn Jørgensen
ClientServerConnection.java:106)
>>
>> at java.base/java.lang.Thread.run(Unknown Source)
>>
>> Caused by: java.lang.ClassNotFoundException: okio.BufferedSource
>>
>> at
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
>>
>> at
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
>> Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> ... 26 more
>>
>>
>>
>> Replaced the existing jar with the JAR file at
>> https://repo1.maven.org/maven2/com/squareup/okio/okio/3.4.0/okio-3.4.0.jar
>> <https://secure-web.cisco.com/1bTvNPAJgVtYdy2nfHp1eUSEqLfelqshEI8TO89yzE25dM5y8HHDCwYxrzTLlmcAFi6uIbQLO2OiJht-xgXmI3lFdV8YpP0j3re47gncrBpwO9m6xYQeLhqXUAnUVP2MoxHbdHlZcdSwDqWkjbOKudm7Go1ICzxhw_VBXuK9n8XF3y7__B86mqWNsroDGD3hbH_tTQTHpXK-4tJCeIZTKmwItL1A3zlRL8lBHG_zgTDSiX9W7ufy8rHP2JZEp_FaftGMsnPA56IGHQVQAmOIobPSQDi4MfsiyUj0HsHPH3fZaz8_8TnPu178yfi8pCurkmr7b0X0NmFTdeAuFHKhdoOYooWDPsuBIYxknd3p1wLXrQezp26QrkjEiUMjNH9S18HPLH2BfN627X6zqQD7sVUUo1hzMRvnllVZVQWPL6H7lisyk-7w2pTAX6bm9wZuWTN9U4hZzjoc1-s1YumCiexaMOfiqEbTKppNDB8jOXBPIS9HDdEVDUl8OAIKz-T480x_NePZwHGT4hHtSwUaHCw/https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Fcom%2Fsquareup%2Fokio%2Fokio%2F3.4.0%2Fokio-3.4.0.jar>
>>
>>
>>
>>
>>
>> PFB, the vulnerability details:
>>
>> Link: https://nvd.nist.gov/vuln/detail/CVE-2023-3635
>> <https://secure-web.cisco.com/1KDv1iIbxjIsZCdyvwVzp9hDXe9ClcztVaj_gKzaoEQJ0Qb1BrTG7ivs0bsKiKVJvN8BJ0KvCwQKgWJGRfrWZYTkrgVMl1RfmnIn2fTYgyXd5ATU-4FBIQstOXRlc1dQnRNW9jr8OZCqV_xqbzAuLEP--uh0URczU8BYxyefL4Ly6ntQ2Y0BtKEOq3LZflTianf1d3UH30m_mmQmt3pE_3S7qFc9R9I3NqWJmkxuYVC1gVhnWBpbelMz5P7Q8D4GXo_L7tgj_nPwQyAcwqLjaIUVf-SYPU8T-WsaxeDkW6gp5oNKuYFqDzxXghsRJxzOj7i5noa1bj3-uSj0f0tT8xZ3L42uUTNgHczw65Kt1WnUK2-A_yhTmEhg07yFdwKQha6bQyn2KoicHjcdlQzAWsRmbBgzVjhDKMGdPn9Mrm5V7lw1QgeoFmddSJsreHy6TcNY2dXtqEzhw-OX2ibRtOrCX4M_n1ONE73yhGXAhqarKsd1tl5IgDfp_MlsFe9bkMa9G2AK5pcO0GeI8r7yDXA/https%3A%2F%2Fnvd.nist.gov%2Fvuln%2Fdetail%2FCVE-2023-3635>
>>
>>
>>
>> Any guidance here would be of great help.
>>
>>
>>
>> Thanks,
>>
>> Sanket A.
>>
>> This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message and any disclosure, copying, or distribution of this message,
>> or the taking of any action based on it, by you is strictly prohibited.
>>
>> Deloitte refers to a Deloitte member firm, one of its related entities,
>> or Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is
>> a separate legal entity and a member of DTTL. DTTL does not provide
>> services to clients. Please see www.deloitte.com/about to learn more.
>>
>> v.E.1
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
[SPARK-43225][BUILD][SQL] Remove jackson-core-asl and jackson-mapper-asl
from pre-built distribution <https://github.com/apache/spark/pull/40893>

tor. 2. nov. 2023 kl. 09:15 skrev Bjørn Jørgensen :

> In spark 3.5.0 removed  jackson-core-asl and jackson-mapper-asl  those
> are with groupid org.codehaus.jackson.
>
> Those others jackson-* are with groupid com.fasterxml.jackson.core
>
>
> tor. 2. nov. 2023 kl. 01:43 skrev eab...@163.com :
>
>> Hi,
>> Please check the versions of jar files starting with "jackson-". Make 
>> sure all versions are consistent.
>>  jackson jar list in spark-3.3.0:
>> 
>> 2022/06/10  04:3775,714 jackson-annotations-*2.13.3*.jar
>> 2022/06/10  04:37   374,895 jackson-core-*2.13.3*.jar
>> 2022/06/10  04:37   232,248 jackson-core-asl-1.9.13.jar
>> 2022/06/10  04:37 1,536,542 jackson-databind-*2.13.3*.jar
>> 2022/06/10  04:3752,020 jackson-dataformat-yaml-*2.13.3*.jar
>> 2022/06/10  04:37   121,201 jackson-datatype-jsr310-*2.13.3*.jar
>> 2022/06/10  04:37   780,664 jackson-mapper-asl-1.9.13.jar
>> 2022/06/10  04:37   458,981 jackson-module-scala_2.12-*2.13.3*
>> .jar
>> 
>>
>> Spark 3.3.0 uses Jackson version 2.13.3, while Spark 3.5.0 uses Jackson 
>> version 2.15.2.
>> I think you can remove the lower version of Jackson package to keep the 
>> versions consistent.
>> eabour
>>
>>
>> *From:* moshik.vi...@veeva.com.INVALID
>> *Date:* 2023-11-01 15:03
>> *To:* user@spark.apache.org
>> *CC:* 'Saar Barhoom' 
>> *Subject:* jackson-databind version mismatch
>>
>> Hi Spark team,
>>
>>
>>
>> On upgrading spark version from 3.2.1 to 3.4.1 got the following issue:
>>
>> *java.lang.NoSuchMethodError: 'com.fasterxml.jackson.core.JsonGenerator
>> com.fasterxml.jackson.databind.ObjectMapper.createGenerator(java.io.OutputStream,
>> com.fasterxml.jackson.core.JsonEncoding)'*
>>
>> *at
>> org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:75)*
>>
>> *at
>> org.apache.spark.SparkThrowableHelper$.getMessage(SparkThrowableHelper.scala:74)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:127)*
>>
>> *at scala.Option.map(Option.scala:230)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)*
>>
>> *at
>> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)*
>>
>> *at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)*
>>
>> *at
>> org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)*
>>
>> *at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)*
>>
>> *at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)*
>>
>> *at
>> org.apache.spark.sql.Dataset.takeAsList(Dataset.scala:3405)*
>>
>> *at
>> com.crossix.safemine.cloud.utils.DebugRDDLogger.showDataset(DebugRDDLogger.java:84)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.getFillRateCountsWithSparkQuery(StatisticsTransformer.java:122)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.StatisticsTransformer.calculateStatistics(StatisticsTransformer.java:61)*
>>
>> *at
>> com.crossix.safemine.cloud.components.statistics.spark.SparkFileStatistics.execute(SparkFileStatistics.java:102)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.calculateAllStatistics(StatisticsFlow.java:146)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.runStatistics(StatisticsFlow.java:119)*
>>
>> *at
>> com.crossix.safemine.cloud.StatisticsFlow.initialFileStatistics(StatisticsFlow.java:77)*
>>
>> *at
>> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:221)*
>>
>> *at
>> com.crossix.safemine.cl

Re: jackson-databind version mismatch

2023-11-02 Thread Bjørn Jørgensen
n-databind fails with:
>
> *com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.5
> requires Jackson Databind version >= 2.10.0 and < 2.11.0*
>
>
>
> According to spark 3.3.0 release notes: "Upgrade Jackson to 2.13.3" but
> spark package of 3.4.1 contains Jackson of 2.10.5
>
> (https://spark.apache.org/releases/spark-release-3-3-0.html)
>
> What am I missing?
>
>
>
> --
>
> *Moshik Vitas*
>
> Senior Software Developer, Crossix
>
> Veeva Systems
>
> m: +972-54-5326-400
>
> moshik.vi...@veeva.com
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Bjørn Jørgensen
alos remove the space in rev. scode

søn. 22. okt. 2023 kl. 19:08 skrev Sadha Chilukoori :

> Hi Meena,
>
> I'm asking to clarify, are the *on *& *and* keywords optional in the join
> conditions?
>
> Please try this snippet, and see if it helps
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> on rev.sys = p.sys
> and rev.prin = p.prin
> and rev.scode= p.bcode
>
> left join item I
> on rev.sys = I.sys
> and rev.custumer_id = I.custumer_id
> and rev. scode = I.scode;
>
> Thanks,
> Sadha
>
> On Sat, Oct 21, 2023 at 3:21 PM Meena Rajani 
> wrote:
>
>> Hello all:
>>
>> I am using spark sql to join two tables. To my surprise I am
>> getting redundant rows. What could be the cause.
>>
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>> where rev.custumer_id = '123456789'
>>
>> The first part of the code brings one row
>>
>> select rev.* from rev
>> inner join customer c
>> on rev.custumer_id =c.id
>> inner join product p
>> rev.sys = p.sys
>> rev.prin = p.prin
>> rev.scode= p.bcode
>>
>>
>> The  item has two rows which have common attributes  and the* final join
>> should result in 2 rows. But I am seeing 4 rows instead.*
>>
>> left join item I
>> on rev.sys = i.sys
>> rev.custumer_id = I.custumer_id
>> rev. scode = I.scode
>>
>>
>>
>> Regards,
>> Meena
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Discriptency sample standard deviation pyspark and Excel

2023-09-19 Thread Bjørn Jørgensen
from pyspark.sql import SparkSession
from pyspark.sql.functions import stddev_samp, stddev_pop

spark = SparkSession.builder.getOrCreate()

data = [(52.7,), (45.3,), (60.2,), (53.8,), (49.1,), (44.6,), (58.0,),
(56.5,), (47.9,), (50.3,)]
df = spark.createDataFrame(data, ["value"])

df.select(stddev_samp("value").alias("sample_stddev")).show()

+-+
|sample_stddev|
+-+
|5.320025062597606|
+-+



In MS Excel 365 Norwegian

[image: image.png]


=STDAVVIKA(B1:B10)

=STDAV.S(B1:B10)

They both prints
5,32002506

 Which is the same as pyspark does.





tir. 19. sep. 2023 kl. 14:15 skrev Helene Bøe :

> Hi!
>
>
>
> I am applying the stddev function (so actually stddev_samp), however when
> comparing with the sample standard deviation in Excel the resuls do not
> match.
>
> I cannot find in your documentation any more specifics on how the sample
> standard deviation is calculated, so I cannot compare the difference toward
> excel, which uses
>
> .
>
> I am trying to avoid using Excel at all costs, but if the stddev_samp
> function is not calculating the standard deviation correctly I have a
> problem.
>
> I hope you can help me resolve this issue.
>
>
>
> Kindest regards,
>
>
>
> *Helene Bøe*
> *Graduate Project Engineer*
> Recycling Process & Support
>
> M: +47 980 00 887
> helene.b...@hydro.com
> <https://intra.hydro.com/EPiServer/CMS/Content/en/%2c%2c9/?epieditmode=False>
>
> Norsk Hydro ASA
> Drammensveien 264
> NO-0283 Oslo, Norway
> www.hydro.com
> <https://intra.hydro.com/EPiServer/CMS/Content/en/%2c%2c9/?epieditmode=False>
>
>
> NOTICE: This e-mail transmission, and any documents, files or previous
> e-mail messages attached to it, may contain confidential or privileged
> information. If you are not the intended recipient, or a person responsible
> for delivering it to the intended recipient, you are hereby notified that
> any disclosure, copying, distribution or use of any of the information
> contained in or attached to this message is STRICTLY PROHIBITED. If you
> have received this transmission in error, please immediately notify the
> sender and delete the e-mail and attached documents. Thank you.
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Filter out 20% of rows

2023-09-16 Thread Bjørn Jørgensen
EDIT:
I don't think that the question asker will have only returned the top 25
percentages.

lør. 16. sep. 2023 kl. 21:54 skrev Bjørn Jørgensen :

> percentile_approx returns the approximate percentile(s)
> <https://github.com/apache/spark/pull/14868> The memory consumption is
> bounded. The larger accuracy parameter we choose, the smaller error we get.
> The default accuracy value is 1, to match with Hive default setting.
> Choose a smaller value for a smaller memory footprint.
>
> When I run my code on a single PC where N = 10 millions it takes 22.52
> seconds. Notebook added.
>
> I don't think that the question asker will have only returned the top
> 20 percentages.
>
>
> lør. 16. sep. 2023 kl. 17:49 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Hi Bjorn,
>>
>> I thought that one is better off using percentile_approx as it seems to
>> be the recommended approach for computing percentiles and can simplify the
>> code.
>> I have modified your code to use percentile_approx rather than manually
>> computing it. It would be interesting to hear ideas on this.
>>
>> Here is the code:
>>
>> # Standard library imports
>> import json
>> import multiprocessing
>> import os
>> import re
>> import sys
>> import random
>>
>> # Third-party imports
>> import numpy as np
>> import pandas as pd
>> import pyarrow
>>
>> # Pyspark imports
>> from pyspark import SparkConf, SparkContext
>> from pyspark.sql import SparkSession, functions as F, Window
>> from pyspark.sql.functions import (
>> col, concat, concat_ws, expr, lit, trim, udf
>> )
>> from pyspark.sql.types import (
>> IntegerType, StringType, StructField, StructType,
>> DoubleType, TimestampType
>> )
>> from pyspark import pandas as ps
>>
>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
>> e.g. 4015976448
>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>> conf.setMaster("local[{}]".format(number_cores))
>> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>> "spark.sql.repl.eagerEval.enabled", "True"
>> ).set("spark.sql.adaptive.enabled", "True").set(
>> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>> ).set(
>> "spark.sql.repl.eagerEval.maxNumRows", "1"
>> )
>>
>> return
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>>
>> spark = get_spark_session("My super app", SparkConf())
>> sc = SparkContext.getOrCreate()
>> sc.setLogLevel("ERROR")
>>
>> def generate_ip():
>> return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>
>> def generate_timestamp():
>> return pd.Timestamp(
>> year=random.randint(2021, 2023),
>> month=random.randint(1, 12),
>> day=random.randint(1, 28),
>> hour=random.randint(0, 23),
>> minute=random.randint(0, 59),
>> second=random.randint(0, 59)
>> )
>>
>> def random_gbps():
>> return random.uniform(0, 10)
>>
>> # Number of rows
>> n = 20
>>
>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(),
>> "date_time": generate_timestamp()} for _ in range(n)]
>> df = spark.createDataFrame(pd.DataFrame(data))
>> df.show()
>>
>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>
>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>
>> # Calculate the 80th percentile value
>> percentile_80 = agg_df.agg(F.expr("percentile_approx(total_gbps,
>> 0.8)").alias("percentile_80")).collect()[0]["percentile_80"]
>>
>> # Filter the DataFrame based on the condition
>> filtered_df = df.filter(df["gbps"] >= percentile_80)
>>
>> # Show the filtered DataFrame
>> print(f"Filtered DataFrame")
>> filtered_df.show()
>>
>> print(f"Total rows in data frame = {df.count()}"

Re: Filter out 20% of rows

2023-09-16 Thread Bjørn Jørgensen
.135.194|0.3510752223686242|2022-01-24 04:13:53|
> | 147.118.171.59|6.4071750880652765|2023-10-08 16:49:10|
> |  75.41.101.165|2.1484984272041685|2022-07-13 21:02:58|
> |  163.26.238.22|   9.8999646499433|2023-01-12 17:54:44|
> | 184.145.98.231|1.8875849709728088|2022-09-18 19:53:58|
> | 125.77.236.177|  1.17126350326476|2021-08-19 18:48:42|
> |  34.103.211.39|  9.51081430594299|2023-02-05 18:39:23|
> |   117.37.42.91| 1.122437784309721|2021-03-23 17:27:27|
> | 108.115.42.171| 8.165187506266607|2023-07-26 03:57:50|
> | 98.105.153.129| 9.284242190156004|2023-10-10 22:36:47|
> | 145.35.252.142| 9.787384042283957|2022-08-26 00:53:27|
> |  18.76.138.108| 6.939770760444909|2022-04-01 01:18:27|
> |31.33.71.26| 4.820947188427366|2021-06-10 22:02:51|
> |135.22.8.38| 9.587849542001745|2021-09-21 15:11:59|
> |104.231.110.207| 9.045897927807715|2023-06-28 06:01:00|
> +---+--+---+
>
> Filtered DataFrame
> +--+-+---+
> |  incoming_ips| gbps|  date_time|
> +--+-+---+
> | 163.26.238.22|  9.8999646499433|2023-01-12 17:54:44|
> | 34.103.211.39| 9.51081430594299|2023-02-05 18:39:23|
> |98.105.153.129|9.284242190156004|2023-10-10 22:36:47|
> |145.35.252.142|9.787384042283957|2022-08-26 00:53:27|
> |   135.22.8.38|9.587849542001745|2021-09-21 15:11:59|
> +--+-+---+
>
> Total rows in data frame = 20
> Result satisfying 80% percentile = 5
>
> Cheers
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 16 Sept 2023 at 11:46, Mich Talebzadeh 
> wrote:
>
>> Happy Saturday coding 
>>
>>
>> Mich Talebzadeh,
>> Distinguished Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 16 Sept 2023 at 11:30, Bjørn Jørgensen 
>> wrote:
>>
>>> ah.. yes that's right.
>>> I did have to use some time on this one and I was having some issues
>>> with the code.
>>> I restart the notebook kernel now and rerun it and I get the same
>>> result.
>>>
>>> lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
>>>> Splendid code. A minor error glancing at your code.
>>>>
>>>> print(df.count())
>>>> print(result_df.count())
>>>>
>>>>
>>>> You have not defined result_df. I gather you meant "result"?
>>>>
>>>>
>>>> print(result.count())
>>>>
>>>>
>>>> That should fix it 樂
>>>>
>>>> HTH
>>>>
>>>>
>>>> Mich Talebzadeh,
>>>> Distinguished Technologist, Solutions Architect & Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The aut

Re: Filter out 20% of rows

2023-09-16 Thread Bjørn Jørgensen
ah.. yes that's right.
I did have to use some time on this one and I was having some issues with
the code.
I restart the notebook kernel now and rerun it and I get the same result.

lør. 16. sep. 2023 kl. 11:41 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Splendid code. A minor error glancing at your code.
>
> print(df.count())
> print(result_df.count())
>
>
> You have not defined result_df. I gather you meant "result"?
>
>
> print(result.count())
>
>
> That should fix it 樂
>
> HTH
>
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 16 Sept 2023 at 06:00, Bjørn Jørgensen 
> wrote:
>
>> Something like this?
>>
>>
>> # Standard library imports
>> import json
>> import multiprocessing
>> import os
>> import re
>> import sys
>> import random
>>
>> # Third-party imports
>> import numpy as np
>> import pandas as pd
>> import pyarrow
>>
>> # Pyspark imports
>> from pyspark import SparkConf, SparkContext
>> from pyspark.sql import SparkSession, functions as F, Window
>> from pyspark.sql.functions import (
>> col, concat, concat_ws, expr, lit, trim, udf
>> )
>> from pyspark.sql.types import (
>> IntegerType, StringType, StructField, StructType,
>> DoubleType, TimestampType
>> )
>> from pyspark import pandas as ps
>>
>> os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 
>> 4015976448
>> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>> conf.setMaster("local[{}]".format(number_cores))
>> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>> "spark.sql.repl.eagerEval.enabled", "True"
>> ).set("spark.sql.adaptive.enabled", "True").set(
>> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>> ).set(
>> "spark.sql.repl.eagerEval.maxNumRows", "1"
>> ).set(
>> "sc.setLogLevel", "ERROR"
>> )
>>
>> return 
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>>
>> spark = get_spark_session("My super app", SparkConf())
>> spark.sparkContext.setLogLevel("ERROR")
>>
>>
>>
>> def generate_ip():
>> return ".".join(str(random.randint(0, 255)) for _ in range(4))
>>
>> def generate_timestamp():
>> return pd.Timestamp(
>> year=random.randint(2021, 2023),
>> month=random.randint(1, 12),
>> day=random.randint(1, 28),
>> hour=random.randint(0, 23),
>> minute=random.randint(0, 59),
>> second=random.randint(0, 59)
>> )
>>
>> def random_gbps():
>> return random.uniform(0, 10)
>>
>> # Number of rows
>> n = 20
>>
>> data = [{"incoming_ips": generate_ip(), "gbps": random_gbps(), "date_time": 
>> generate_timestamp()} for _ in range(n)]
>> df = spark.createDataFrame(pd.DataFrame(data))
>> df.show()
>>
>> agg_df = df.groupBy("incoming_ips").agg(F.sum("gbps").alias("total_gbps"))
>>
>> windowRank = Window.orderBy(F.col("total_gbps").desc())
>> agg_df = agg_df.withColumn("rank", F.percent_rank().over(windowRank))
>>
>> top_80_ips = agg_df.filter(F.col("rank") <= 0.80)
>> result = df.join(top_80_ips, on="incoming_ips", 
>> how="inner").select("incoming_ips", "gbps", "date_time")
>> result.show()
>>
>> print(df.count())
>> print(result_df.count())

Re: Spark stand-alone mode

2023-09-15 Thread Bjørn Jørgensen
you need to setup ssh without password, use key instead.  How to connect
without password using SSH (passwordless)
<https://levelup.gitconnected.com/how-to-connect-without-password-using-ssh-passwordless-9b8963c828e8>

fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Hi,
>
> Can these 4 nodes talk to each other through ssh as trusted hosts (on top
> of the network that Sean already mentioned)? Otherwise you need to set it
> up. You can install a LAN if you have another free port at the back of your
> HPC nodes. They should
>
> You ought to try to set up a Hadoop cluster pretty easily. Check this old
> article of mine for Hadoop set-up.
>
>
> https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D
>
> Hadoop will provide you with a common storage layer (HDFS) that these
> nodes will be able to share and talk. Yarn is your best bet as the resource
> manager with reasonably powerful hosts you have. However, for now the Stand
> Alone mode will do. Make sure that the Metastore you choose, (by default it
> will use Hive Metastore called Derby :( ) is something respetable like
> Postgres DB that can handle multiple concurrent spark jobs
>
> HTH
>
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:
>
>>
>> Hi all,
>>
>> We have 4 HPC nodes and installed spark individually in all nodes.
>>
>> Spark is used as local mode(each driver/executor will have 8 cores and 65
>> GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
>> scheduler.
>>
>> As this is local mode, we are facing performance issue(as only one
>> executor) when it comes dealing with large datasets.
>>
>> Can I convert this 4 nodes into spark standalone cluster. We dont have
>> hadoop so yarn mode is out of scope.
>>
>> Shall I follow the official documentation for setting up standalone
>> cluster. Will it work? Do I need to aware anything else?
>> Can you please share your thoughts?
>>
>> Thanks,
>> Elango
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Filter out 20% of rows

2023-09-15 Thread Bjørn Jørgensen
8|
| 13.236.170.177|  2.41361938344535|2021-08-11 02:19:06|
|  26.140.88.127|  7.51335778127692|2023-06-02 14:13:30|
|  7.118.207.252| 6.450499049816286|2022-12-11 06:36:20|
|11.8.10.136| 8.750329246667354|2023-02-03 05:33:16|
|  232.140.56.86| 4.289740988237201|2023-02-22 20:10:09|
|   68.117.9.255| 5.384340363304169|2022-12-03 09:55:26|
+---+--+---+

20
16



fre. 15. sep. 2023 kl. 20:14 skrev ashok34...@yahoo.com.INVALID
:

> Hi team,
>
> I am using PySpark 3.4
>
> I have a table of million rows that has few columns. among them incoming
> ips  and what is known as gbps (Gigabytes per second) and date and time
> of  incoming ip.
>
> I want to filter out 20% of low active ips and work on the remainder of
> data. How can I do thiis in PySpark?
>
> Thanks
>
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Okio Vulnerability in Spark 3.4.1

2023-08-31 Thread Bjørn Jørgensen
IGHQVQAmOIobPSQDi4MfsiyUj0HsHPH3fZaz8_8TnPu178yfi8pCurkmr7b0X0NmFTdeAuFHKhdoOYooWDPsuBIYxknd3p1wLXrQezp26QrkjEiUMjNH9S18HPLH2BfN627X6zqQD7sVUUo1hzMRvnllVZVQWPL6H7lisyk-7w2pTAX6bm9wZuWTN9U4hZzjoc1-s1YumCiexaMOfiqEbTKppNDB8jOXBPIS9HDdEVDUl8OAIKz-T480x_NePZwHGT4hHtSwUaHCw/https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Fcom%2Fsquareup%2Fokio%2Fokio%2F3.4.0%2Fokio-3.4.0.jar>
>
>
>
>
>
> PFB, the vulnerability details:
>
> Link: https://nvd.nist.gov/vuln/detail/CVE-2023-3635
> <https://secure-web.cisco.com/1KDv1iIbxjIsZCdyvwVzp9hDXe9ClcztVaj_gKzaoEQJ0Qb1BrTG7ivs0bsKiKVJvN8BJ0KvCwQKgWJGRfrWZYTkrgVMl1RfmnIn2fTYgyXd5ATU-4FBIQstOXRlc1dQnRNW9jr8OZCqV_xqbzAuLEP--uh0URczU8BYxyefL4Ly6ntQ2Y0BtKEOq3LZflTianf1d3UH30m_mmQmt3pE_3S7qFc9R9I3NqWJmkxuYVC1gVhnWBpbelMz5P7Q8D4GXo_L7tgj_nPwQyAcwqLjaIUVf-SYPU8T-WsaxeDkW6gp5oNKuYFqDzxXghsRJxzOj7i5noa1bj3-uSj0f0tT8xZ3L42uUTNgHczw65Kt1WnUK2-A_yhTmEhg07yFdwKQha6bQyn2KoicHjcdlQzAWsRmbBgzVjhDKMGdPn9Mrm5V7lw1QgeoFmddSJsreHy6TcNY2dXtqEzhw-OX2ibRtOrCX4M_n1ONE73yhGXAhqarKsd1tl5IgDfp_MlsFe9bkMa9G2AK5pcO0GeI8r7yDXA/https%3A%2F%2Fnvd.nist.gov%2Fvuln%2Fdetail%2FCVE-2023-3635>
>
>
>
> Any guidance here would be of great help.
>
>
>
> Thanks,
>
> Sanket A.
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
>
> Deloitte refers to a Deloitte member firm, one of its related entities, or
> Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a
> separate legal entity and a member of DTTL. DTTL does not provide services
> to clients. Please see www.deloitte.com/about to learn more.
>
> v.E.1
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Problem with spark 3.4.1 not finding spark java classes

2023-08-21 Thread Bjørn Jørgensen
In yours file  /home/spark/real-estate/pullhttp/pull_apartments.py

replace import org.apache.spark.SparkContext with from pyspark import
SparkContext

man. 21. aug. 2023 kl. 15:13 skrev Kal Stevens :

> I am getting a class not found error
> import org.apache.spark.SparkContext
>
> It sounds like this is because pyspark is not installed, but as far as I
> can tell it is.
> Pyspark is installed in the correct python verison
>
>
> root@namenode:/home/spark/# pip3.10 install pyspark
> Requirement already satisfied: pyspark in
> /usr/local/lib/python3.10/dist-packages (3.4.1)
> Requirement already satisfied: py4j==0.10.9.7 in
> /usr/local/lib/python3.10/dist-packages (from pyspark) (0.10.9.7)
>
>
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.4.1
>   /_/
>
> Using Python version 3.10.12 (main, Jun 11 2023 05:26:28)
> Spark context Web UI available at http://namenode:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1692452853354_0008).
> SparkSession available as 'spark'.
> Traceback (most recent call last):
>   File "/home/spark/real-estate/pullhttp/pull_apartments.py", line 11, in
> 
> import org.apache.spark.SparkContext
> ModuleNotFoundError: No module named 'org.apache.spark.SparkContext'
> 2023-08-20T19:45:19,242 INFO  [Thread-5] spark.SparkContext: SparkContext
> is stopping with exitCode 0.
> 2023-08-20T19:45:19,246 INFO  [Thread-5] server.AbstractConnector: Stopped
> Spark@467be156{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
> 2023-08-20T19:45:19,247 INFO  [Thread-5] ui.SparkUI: Stopped Spark web UI
> at http://namenode:4040
> 2023-08-20T19:45:19,251 INFO  [YARN application state monitor]
> cluster.YarnClientSchedulerBackend: Interrupting monitor thread
> 2023-08-20T19:45:19,260 INFO  [Thread-5]
> cluster.YarnClientSchedulerBackend: Shutting down all executors
> 2023-08-20T19:45:19,260 INFO  [dispatcher-CoarseGrainedScheduler]
> cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to
> shut down
> 2023-08-20T19:45:19,263 INFO  [Thread-5]
> cluster.YarnClientSchedulerBackend: YARN client scheduler backend Stopped
> 2023-08-20T19:45:19,267 INFO  [dispatcher-event-loop-29]
> spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint
> stopped!
> 2023-08-20T19:45:19,271 INFO  [Thread-5] memory.MemoryStore: MemoryStore
> cleared
> 2023-08-20T19:45:19,271 INFO  [Thread-5] storage.BlockManager:
> BlockManager stopped
> 2023-08-20T19:45:19,275 INFO  [Thread-5] storage.BlockManagerMaster:
> BlockManagerMaster stopped
> 2023-08-20T19:45:19,276 INFO  [dispatcher-event-loop-8]
> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> 2023-08-20T19:45:19,279 INFO  [Thread-5] spark.SparkContext: Successfully
> stopped SparkContext
> 2023-08-20T19:45:19,687 INFO  [shutdown-hook-0] util.ShutdownHookManager:
> Shutdown hook called
> 2023-08-20T19:45:19,688 INFO  [shutdown-hook-0] util.ShutdownHookManager:
> Deleting directory
> /tmp/spark-9375452d-1989-4df5-9d85-950f751ce034/pyspark-2fcfbc8e-fd40-41f5-bf8d-e4c460332895
> 2023-08-20T19:45:19,689 INFO  [shutdown-hook-0] util.ShutdownHookManager:
> Deleting directory /tmp/spark-bf6cbc46-ad8b-429a-9d7a-7d98b7d7912e
> 2023-08-20T19:45:19,690 INFO  [shutdown-hook-0] util.ShutdownHookManager:
> Deleting directory /tmp/spark-9375452d-1989-4df5-9d85-950f751ce034
> 2023-08-20T19:45:19,691 INFO  [shutdown-hook-0] util.ShutdownHookManager:
> Deleting directory /tmp/localPyFiles-6c113b2b-9ac3-45e3-9032-d1c83419aa64
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark Vulnerabilities

2023-08-14 Thread Bjørn Jørgensen
I have added links to the github PR. Or comment for those that I have not
seen before.

Apache Spark has very many dependencies, some can easily be upgraded while
others are very hard to fix.

Please feel free to open a PR if you wanna help.

man. 14. aug. 2023 kl. 14:06 skrev Sankavi Nagalingam
:

> Hi Team,
>
>
>
> We could see there are many dependent vulnerabilities present in the
> latest spark-core:3.4.1.jar. PFA
>
> Could you please let us know when will be the fix version available for
> the users.
>
>
>
> Thanks,
>
> Sankavi
>
>
>
> The information in this e-mail and any attachments is confidential and may
> be legally privileged. It is intended solely for the addressee or
> addressees. Any use or disclosure of the contents of this
> e-mail/attachments by a not intended recipient is unauthorized and may be
> unlawful. If you have received this e-mail in error please notify the
> sender. Please note that any views or opinions presented in this e-mail are
> solely those of the author and do not necessarily represent those of
> TEMENOS. We recommend that you check this e-mail and any attachments
> against viruses. TEMENOS accepts no liability for any damage caused by any
> malicious code or virus transmitted by this e-mail.
>
> ---------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Spark-3.4.1-Vulnerablities.xlsx
Description: MS-Excel 2007 spreadsheet

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [PySpark][UDF][PickleException]

2023-08-10 Thread Bjørn Jørgensen
out what is going on. The UDF essentially pads a 2D array to
> a certain fixed length. When the code uses NumPy, it fails with a
> PickleException. When I re write using plain python, it works like charm.:
>
> This does not work:
>
>
> @udf("array>")
> def pad(arr: List[List[float]], n: int) -> List[List[float]]:
> return np.pad(arr, [(n, 0), (0, 0)], "constant",
> constant_values=0.0).tolist()
>
> But this works:
> @udf("array>")
> def pad(arr, n):
> padded_arr = []
> for i in range(n):
> padded_arr.append([0.0] * len(arr[0]))
> padded_arr.extend(arr)
> return padded_arr
>
> The code for calling them remains exactly the same:
> df.withColumn("test", pad(col("array_col"), expected_length -
> actual_length)
>
> What am I missing?
>
> The arrays do not have any NaNs or Nulls.
> Any thoughts or suggestions or tips for troubleshooting would be
> appreciated.
>
> Best regards,
> Sanket
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Rename columns without manually setting them all

2023-06-21 Thread Bjørn Jørgensen
data = {
"Employee ID": [12345, 12346, 12347, 12348, 12349],
"Name": ["Dummy x", "Dummy y", "Dummy z", "Dummy a", "Dummy b"],
"Client": ["Dummy a", "Dummy b", "Dummy c", "Dummy d", "Dummy e"],
"Project": ["abc", "def", "ghi", "jkl", "mno"],
"Team": ["team a", "team b", "team c", "team d", "team e"],
"01/01/2022": ["OFF", "WO", "WH", "WH", "OFF"],
"02/01/2022": ["WO", "WO", "OFF", "WH", "WH"],
"03/01/2022": ["WH", "WH", "WH", "OFF", "WO"],
"04/01/2022": ["WH", "WO", "WO", "WH", "OFF"],
"05/01/2022": ["WH", "WH", "OFF", "WO", "WO"],
}

df = ps.DataFrame(data)

# Define dates columns
dates_columns = df.columns[5:]

# Melt the dataframe and count the occurrences
df_melt = df.melt(id_vars=df.columns[:5], value_vars=dates_columns,
var_name="Date", value_name="Status")
df_counts = df_melt.groupby(["Date", "Status"]).size().unstack()
df_counts.sort_index(inplace=True)
df_counts

[image: image.png]

ons. 21. juni 2023 kl. 14:39 skrev Farshid Ashouri <
farsheed.asho...@gmail.com>:

> You can use selectExpr and stack to achieve the same effect in PySpark:
>
>
>
> df = spark.read.csv("your_file.csv", header=True, inferSchema=True)
>
> date_columns = [col for col in df.columns if '/' in col]
>
> df = df.selectExpr(["`Employee ID`", "`Name`", "`Client`", "`Project`",
> "`Team`”]
> + [f"stack({len(date_columns)}, {', '.join([f'`{col}`, `{col}` as
> `Status`' for col in date_columns])}) as (`Date`, `Status`)”])
>
> result = df.groupby("Date", "Status").count()
>
>
>
>
> On 21 Jun 2023, at 11:45, John Paul Jayme 
> wrote:
>
> Hi,
>
> This is currently my column definition :
> Employee ID Name Client Project Team 01/01/2022 02/01/2022 03/01/2022
> 04/01/2022 05/01/2022
> 12345 Dummy x Dummy a abc team a OFF WO  WH WH WH
> As you can see, the outer columns are just daily attendance dates. My goal
> is to count the employees who were OFF / WO / WH on said dates. I need to
> transpose them so it would look like this :
>
> 
>
> I am still new to pandas. Can you guide me on how to produce this? I am
> reading about melt() and set_index() but I am not sure if they are the
> correct functions to use.
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
yes, p_df = DF.toPandas() that is THE pandas the one you know.

change p_df = DF.toPandas() to
p_df = DF.pandas_on_spark()
or
p_df = DF.to_pandas_on_spark()
or
p_df = DF.pandas_api()
or
p_df = DF.to_koalas()


https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html

Then you will have yours pyspark df to panda API on spark.

tir. 20. juni 2023 kl. 22:16 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> OK thanks
>
> So the issue seems to be creating  a Panda DF from Spark DF (I do it for
> plotting with something like
>
> import matplotlib.pyplot as plt
> p_df = DF.toPandas()
> p_df.plt()
>
> I guess that stays in the driver.
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 20:46, Sean Owen  wrote:
>
>> No, a pandas on Spark DF is distributed.
>>
>> On Tue, Jun 20, 2023, 1:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Thanks but if you create a Spark DF from Pandas DF that Spark DF is not
>>> distributed and remains on the driver. I recall a while back we had this
>>> conversation. I don't think anything has changed.
>>>
>>> Happy to be corrected
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 20 Jun 2023 at 20:09, Bjørn Jørgensen 
>>> wrote:
>>>
>>>> Pandas API on spark is an API so that users can use spark as they use
>>>> pandas. This was known as koalas.
>>>>
>>>> Is this limitation still valid for Pandas?
>>>> For pandas, yes. But what I did show wos pandas API on spark so its
>>>> spark.
>>>>
>>>>  Additionally when we convert from Panda DF to Spark DF, what process
>>>> is involved under the bonnet?
>>>> I gess pyarrow and drop the index column.
>>>>
>>>> Have a look at
>>>> https://github.com/apache/spark/tree/master/python/pyspark/pandas
>>>>
>>>> tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
>>>> mich.talebza...@gmail.com>:
>>>>
>>>>> Whenever someone mentions Pandas I automatically think of it as an
>>>>> excel sheet for Python.
>>>>>
>>>>> OK my point below needs some qualification
>>>>>
>>>>> Why Spark here. Generally, parallel architecture comes into play when
>>>>> the data size is significantly large which cannot be handled on a single
>>>>> machine, hence, the use of Spark becomes meaningful. In cases where (the
>>>>> generated) data size is going to be very large (which is often norm rather
>>>>> than the exception these days), the data cannot be processed and stored in
>>>>> Pandas data frames as these data frames store data in RAM. Then, the whole
>>>>> dataset from a storage like HDFS or cloud storage cannot be collected,
>>>>> because it will take significant time and space and probably won't fit in 
>>>>> a
>>>>> single machine RAM. (in this the driver memory)
>>>>>
>>>>> Is this limitation still valid for Pandas? Additionally when we
>>>>> convert from Panda DF to Spark DF, what process is involved un

Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
Pandas API on spark is an API so that users can use spark as they use
pandas. This was known as koalas.

Is this limitation still valid for Pandas?
For pandas, yes. But what I did show wos pandas API on spark so its spark.

 Additionally when we convert from Panda DF to Spark DF, what process is
involved under the bonnet?
I gess pyarrow and drop the index column.

Have a look at
https://github.com/apache/spark/tree/master/python/pyspark/pandas

tir. 20. juni 2023 kl. 19:05 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> Whenever someone mentions Pandas I automatically think of it as an excel
> sheet for Python.
>
> OK my point below needs some qualification
>
> Why Spark here. Generally, parallel architecture comes into play when the
> data size is significantly large which cannot be handled on a single
> machine, hence, the use of Spark becomes meaningful. In cases where (the
> generated) data size is going to be very large (which is often norm rather
> than the exception these days), the data cannot be processed and stored in
> Pandas data frames as these data frames store data in RAM. Then, the whole
> dataset from a storage like HDFS or cloud storage cannot be collected,
> because it will take significant time and space and probably won't fit in a
> single machine RAM. (in this the driver memory)
>
> Is this limitation still valid for Pandas? Additionally when we convert
> from Panda DF to Spark DF, what process is involved under the bonnet?
>
> Thanks
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Jun 2023 at 13:07, Bjørn Jørgensen 
> wrote:
>
>> This is pandas API on spark
>>
>> from pyspark import pandas as ps
>> df = ps.read_excel("testexcel.xlsx")
>> [image: image.png]
>> this will convert it to pyspark
>> [image: image.png]
>>
>> tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
>> :
>>
>>> Good day,
>>>
>>>
>>>
>>> I have a task to read excel files in databricks but I cannot seem to
>>> proceed. I am referencing the API documents -  read_excel
>>> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_excel.html>
>>> , but there is an error sparksession object has no attribute
>>> 'read_excel'. Can you advise?
>>>
>>>
>>>
>>> *JOHN PAUL JAYME*
>>> Data Engineer
>>>
>>> m. +639055716384  w. www.tdcx.com
>>>
>>>
>>>
>>> *Winner of over 350 Industry Awards*
>>>
>>> [image: Linkedin] <https://www.linkedin.com/company/tdcxgroup/> [image:
>>> Facebook] <https://www.facebook.com/tdcxgroup/> [image: Twitter]
>>> <https://twitter.com/tdcxgroup/> [image: Youtube]
>>> <https://www.youtube.com/c/TDCXgroup> [image: Instagram]
>>> <https://www.instagram.com/tdcxgroup/>
>>>
>>>
>>>
>>> This is a confidential email that may be privileged or legally
>>> protected. You are not authorized to copy or disclose the contents of this
>>> email. If you are not the intended addressee, please inform the sender and
>>> delete this email.
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How to read excel file in PySpark

2023-06-20 Thread Bjørn Jørgensen
This is pandas API on spark

from pyspark import pandas as ps
df = ps.read_excel("testexcel.xlsx")
[image: image.png]
this will convert it to pyspark
[image: image.png]

tir. 20. juni 2023 kl. 13:42 skrev John Paul Jayme
:

> Good day,
>
>
>
> I have a task to read excel files in databricks but I cannot seem to
> proceed. I am referencing the API documents -  read_excel
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_excel.html>
> , but there is an error sparksession object has no attribute
> 'read_excel'. Can you advise?
>
>
>
> *JOHN PAUL JAYME*
> Data Engineer
>
> m. +639055716384  w. www.tdcx.com
>
>
>
> *Winner of over 350 Industry Awards*
>
> [image: Linkedin] <https://www.linkedin.com/company/tdcxgroup/> [image:
> Facebook] <https://www.facebook.com/tdcxgroup/> [image: Twitter]
> <https://twitter.com/tdcxgroup/> [image: Youtube]
> <https://www.youtube.com/c/TDCXgroup> [image: Instagram]
> <https://www.instagram.com/tdcxgroup/>
>
>
>
> This is a confidential email that may be privileged or legally protected.
> You are not authorized to copy or disclose the contents of this email. If
> you are not the intended addressee, please inform the sender and delete
> this email.
>
>
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Re: maven with Spark 3.4.0 fails compilation

2023-05-29 Thread Bjørn Jørgensen
mit.scala:1020)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> Part of pom.xml is here
>
>
> https://maven.apache.org/POM/4.0.0; xmlns:xsi="
> https://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="https://maven.apache.org/POM/4.0.0
> https://maven.apache.org/maven-v4_0_0.xsd;>
> 4.0.0
> spark
> 3.0
> ReduceByKey
> ${project.artifactId}
>
> 
> 11.0.1
> 11.0.1
> UTF-8
> 2.13.8 
> 2.15.2
> 
>
> 
>   
> org.scala-lang
> scala-library
> 2.13.8 
>   
>   
> org.apache.spark
> spark-core_2.13 
> 3.4.0
> provided
>   
>   
> org.apache.spark
> spark-sql_2.13
> 3.4.0
> provided
>   
>
>
> Thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 29 May 2023 at 13:44, Bjørn Jørgensen 
> wrote:
>
>> Change
>>
>> 
>> 
>> org.scala-lang
>> scala-library
>> 2.13.11-M2
>> 
>> 
>>
>> to
>>
>> 
>> 
>> org.scala-lang
>> scala-library
>> ${scala.version}
>> 
>>
>> man. 29. mai 2023 kl. 13:20 skrev Lingzhe Sun :
>>
>>> Hi Mich,
>>>
>>> Spark 3.4.0 prebuilt with scala 2.13 is built with version 2.13.8
>>> <https://github.com/apache/spark/blob/88f69d6f92860823b1a90bc162ebca2b7c8132fc/pom.xml#L170>.
>>> Since you are using spark-core_2.13 and spark-sql_2.13, you should stick to
>>> the major(13) and the minor version(8). Not using any of these may cause
>>> unexpected behaviour(though scala claims compatibility among minor version
>>> changes, I've encountered problem using the scala package with the same
>>> major version and different minor version. That may due to bug fixes and
>>> upgrade of scala itself.).
>>> And although I did not encountered such problem, this
>>> <https://stackoverflow.com/a/26411339/19476830>can be a a pitfall for
>>> you.
>>>
>>> --
>>> Best Regards!
>>>
>>> ...
>>> Lingzhe Sun
>>> Hirain Technology
>>>
>>>
>>> *From:* Mich Talebzadeh 
>>> *Date:* 2023-05-29 17:55
>>> *To:* Bjørn Jørgensen 
>>> *CC:* user @spark 
>>> *Subject:* Re: maven with Spark 3.4.0 fails compilation
>>> Thanks for your helpful comments Bjorn.
>>>
>>> I managed to compile the code with maven but when it run it fails with
>>>
>>>   Application is ReduceByKey
>>>
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> scala.package$.Seq()Lscala/collection/immutable/Seq$;
>>> at ReduceByKey$.main(ReduceByKey.scala:23)
>>> at ReduceByKey.main(ReduceByKey.scala)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>>> at org.apache.spark.deploy.SparkSubmit.org
>>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
>>> at
>>> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
>>>  

Re: Re: maven with Spark 3.4.0 fails compilation

2023-05-29 Thread Bjørn Jørgensen
Change



org.scala-lang
scala-library
2.13.11-M2



to



org.scala-lang
scala-library
${scala.version}


man. 29. mai 2023 kl. 13:20 skrev Lingzhe Sun :

> Hi Mich,
>
> Spark 3.4.0 prebuilt with scala 2.13 is built with version 2.13.8
> <https://github.com/apache/spark/blob/88f69d6f92860823b1a90bc162ebca2b7c8132fc/pom.xml#L170>.
> Since you are using spark-core_2.13 and spark-sql_2.13, you should stick to
> the major(13) and the minor version(8). Not using any of these may cause
> unexpected behaviour(though scala claims compatibility among minor version
> changes, I've encountered problem using the scala package with the same
> major version and different minor version. That may due to bug fixes and
> upgrade of scala itself.).
> And although I did not encountered such problem, this
> <https://stackoverflow.com/a/26411339/19476830>can be a a pitfall for you.
>
> --
> Best Regards!
> ...
> Lingzhe Sun
> Hirain Technology
>
>
> *From:* Mich Talebzadeh 
> *Date:* 2023-05-29 17:55
> *To:* Bjørn Jørgensen 
> *CC:* user @spark 
> *Subject:* Re: maven with Spark 3.4.0 fails compilation
> Thanks for your helpful comments Bjorn.
>
> I managed to compile the code with maven but when it run it fails with
>
>   Application is ReduceByKey
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.package$.Seq()Lscala/collection/immutable/Seq$;
> at ReduceByKey$.main(ReduceByKey.scala:23)
> at ReduceByKey.main(ReduceByKey.scala)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> I attach the pom.xml and the sample scala code is self contained and
> basic. Again it runs with SBT with no issues.
>
> FYI, my scala version on host is
>
>  scala -version
> Scala code runner version 2.13.6 -- Copyright 2002-2021, LAMP/EPFL and
> Lightbend, Inc.
>
> I think I have a scala  incompatible somewhere again
>
> Cheers
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 28 May 2023 at 20:29, Bjørn Jørgensen 
> wrote:
>
>> From chatgpt4
>>
>>
>> The problem appears to be that there is a mismatch between the version of
>> Scala used by the Scala Maven plugin and the version of the Scala library
>> defined as a dependency in your POM. You've defined your Scala version in
>> your properties as `2.12.17` but you're pulling in `scala-library` version
>> `2.13.6` as a dependency.
>>
>> The Scala Maven plugin will be using the Scala version defined in the
>> `scala.version` property for compilation, but then it tries to load classes
>> from a different Scala version, hence the error.
>>
>> To resolve this issue, make sure the `scala.version` property matches the
>> version of `scala-library` defined in your dependencies. In your case, you
>> may want to change `scala.version` to `2.13.6`.
>>
>> Here's the corrected part of your POM:
>>
>> ```xml
>> 
>

Re: maven with Spark 3.4.0 fails compilation

2023-05-28 Thread Bjørn Jørgensen
; Java version: 11.0.1, vendor: Oracle Corporation, runtime: /opt/jdk-11.0.1
>
> This from the pom.xml file
>
> https://maven.apache.org/POM/4.0.0; xmlns:xsi="
> https://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="https://maven.apache.org/POM/4.0.0
> https://maven.apache.org/maven-v4_0_0.xsd;>
> 4.0.0
> spark
> 3.0
> ${project.artifactId}
>
> 
> 1.7
> 1.7
> UTF-8
> 2.12.17
> 2.15.2
> 
> 
>   
> org.scala-lang
> scala-library
> 2.13.6
>   
>   
> org.apache.spark
> spark-core_2.13
> 3.4.0
> provided
>   
>   
> org.apache.spark
> spark-sql_2.13
> 3.4.0
> provided
>
> The pom file is attached. These are the errors I am getting
>
> [ERROR] error: error while loading package, class file
> '/home/hduser/.m2/repository/org/scala-lang/scala-library/2.13.6/scala-library-2.13.6.jar(scala/reflect/package.class)'
> is broken
> [ERROR] (class java.lang.RuntimeException/error reading Scala signature of
> package.class: Scala signature package has wrong version
> [ERROR] error: error while loading package, class file
> '/home/hduser/.m2/repository/org/scala-lang/scala-library/2.13.6/scala-library-2.13.6.jar(scala/package.class)'
> is broken
> [ERROR] (class java.lang.RuntimeException/error reading Scala signature of
> package.class: Scala signature package has wrong version
> [ERROR] error: error while loading package, class file
> '/home/hduser/.m2/repository/org/scala-lang/scala-library/2.13.6/scala-library-2.13.6.jar(scala/collection/package.class)'
> is broken
> [ERROR] (class java.lang.RuntimeException/error reading Scala signature of
> package.class: Scala signature package has wrong version
> [ERROR] three errors found
> [ERROR] Failed to execute goal
> org.scala-tools:maven-scala-plugin:2.15.2:compile (default) on project
> scala: wrap: org.apache.commons.exec.ExecuteException: Process exited with
> an error: 1(Exit value: 1) -> [Help 1]
>
> Any ideas will be appreciated.
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Change column values using several when conditions

2023-05-01 Thread Bjørn Jørgensen
you can check if the value exists by using distinct before you loop over
the dataset.

man. 1. mai 2023 kl. 10:38 skrev marc nicole :

> Hello
>
> I want to change values of a column in a dataset according to a mapping
> list that maps original values of that column to other new values. Each
> element of the list (colMappingValues) is a string that separates the
> original values from the new values using a ";".
>
> So for a given column (in the following example colName), I do the
> following processing to alter the column values as described:
>
> for (i=0;i>
>> //below lists contains all distinct values of a column
>> (colMappingValues[i]) and their target values)
>> allValuesChanges = colMappingValues[i].toString().split(";", 2);
>>
>>  dataset  = dataset.withColumn(colName,
>> when(dataset.col(colName).equalTo(allValuesChanges[0])),allValuesChanges[1]).otherwise(dataset.col(colName));
>
> }
>
> which is working but I want it to be efficient to avoid unnecessary
> iterations. Meaning that I want when the column doesn't contain the value
> from the list, the call to withColumn() gets ignored.
> How to do exactly that in a more efficient way using Spark in Java?
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Non string type partitions

2023-04-15 Thread Bjørn Jørgensen
I guess that it has to do with indexing and partitioning data to nodes.
Have a look at data partitioning system design concept
<https://www.enjoyalgorithms.com/blog/data-partitioning-system-design-concept>
 and key range partitions
<https://martinfowler.com/articles/patterns-of-distributed-systems/key-range-partitions.html>


You can work around this by creating a temp view where date is casted to
string.

Note I did have to test this sometimes so I'm using .mode("overwrite") on
the file.


from pyspark.sql import SparkSession, Row
from datetime import date

spark = SparkSession.builder.getOrCreate()

mock_data = [
Row(id=1, name="John", partition_col=date(2023, 4, 11)),
Row(id=2, name="Jane", partition_col=date(2023, 4, 11)),
Row(id=3, name="Alice", partition_col=date(2023, 4, 12)),
Row(id=4, name="Bob", partition_col=date(2023, 4, 12)),
]

mock_df = spark.createDataFrame(mock_data)

parquet_data_path = "test_date"
mock_df.write.partitionBy("partition_col").mode("overwrite").parquet(parquet_data_path)

create_table_sql = f"""
CREATE TABLE IF NOT EXISTS my_table (
id INT,
name STRING)
USING parquet
PARTITIONED BY (partition_col DATE)
OPTIONS ('path' = '{parquet_data_path}')
"""

spark.sql(create_table_sql)

# temp view with the string partition column
create_view_sql = f"""
CREATE OR REPLACE TEMPORARY VIEW my_table_with_string_partition AS
SELECT *, CAST(partition_col AS STRING) AS partition_col_str FROM my_table;
"""
spark.sql(create_view_sql)

query = f"SELECT * FROM my_table_with_string_partition WHERE
partition_col_str = '2023-04-11';"
result = spark.sql(query)

result.show()


+---++-+-+
| id|name|partition_col|partition_col_str|
+---++-+-+
|  1|John|   2023-04-11|   2023-04-11|
|  2|Jane|   2023-04-11|   2023-04-11|
+---++-+-+



lør. 15. apr. 2023 kl. 21:41 skrev Charles vinodh :

>
> bumping this up again for suggestions?.. Is the official recommendation to
> not have *int* or *date* typed partition columns?
>
> On Wed, 12 Apr 2023 at 10:44, Charles vinodh 
> wrote:
>
>> There are  other distributed execution engines (like hive, trino) that do
>> support non-string data types for partition columns such as date and
>> integer.
>> Any idea why this restriction exists in Spark? ..
>>
>>
>> On Tue, 11 Apr 2023 at 20:34, Chitral Verma 
>> wrote:
>>
>>> Because the name of the directory cannot be an object, it has to be a
>>> string to create partitioned dirs like "date=2023-04-10"
>>>
>>> On Tue, 11 Apr, 2023, 8:27 pm Charles vinodh, 
>>> wrote:
>>>
>>>>
>>>> Hi Team,
>>>>
>>>> We are running into the below error when we are trying to run a simple
>>>> query a partitioned table in Spark.
>>>>
>>>> *MetaException(message:Filtering is supported only on partition keys of 
>>>> type string)
>>>> *
>>>>
>>>>
>>>> Our the partition column has been to type *date *instead of string and
>>>> query is a very simple SQL as shown below.
>>>>
>>>> *SELECT * FROM my_table WHERE partition_col = date '2023-04-11'*
>>>>
>>>> Any idea why spark mandates partition columns to be of type string?. Is
>>>> there a recommended work around for this issue?
>>>>
>>>>
>>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Accessing python runner file in AWS EKS kubernetes cluster as in local://

2023-04-12 Thread Bjørn Jørgensen
Yes, it looks inside the docker containers folder. It will work if you are
using s3 og gs.

ons. 12. apr. 2023, 18:02 skrev Mich Talebzadeh :

> Hi,
>
> In my spark-submit to eks cluster, I use the standard code to submit to
> the cluster as below:
>
> spark-submit --verbose \
>--master k8s://$KUBERNETES_MASTER_IP:443 \
>--deploy-mode cluster \
>--name sparkOnEks \
>--py-files local://$CODE_DIRECTORY/spark_on_eks.zip \
>   local:///home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py
>
> In Google Kubernetes Engine (GKE) I simply load them from gs:// storage
> bucket.and it works fine.
>
> I am getting the following error in driver pod
>
>  + CMD=("$SPARK_HOME/bin/spark-submit" --conf 
> "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client 
> "$@")
> + exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf 
> spark.driver.bindAddress=192.168.39.251 --deploy-mode client 
> --properties-file /opt/spark/conf/spark.properties --class 
> org.apache.spark.deploy.PythonRunner 
> local:///home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py
> 23/04/11 23:07:23 WARN NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> /usr/bin/python3: can't open file 
> '/home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py': [Errno 
> 2] No such file or directory
> log4j:WARN No appenders could be found for logger 
> (org.apache.spark.util.ShutdownHookManager).
> It says  can't open file 
> '/home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py':
>
>
> [Errno 2] No such file or directory but it is there!
>
> ls -l /home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py
> -rw-rw-rw- 1 hduser hadoop 5060 Mar 18 14:16 
> /home/hduser/dba/bin/python/spark_on_eks/src/RandomDataBigQuery.py
> So not sure what is going on. I have suspicion that it is looking inside the 
> docker itself for this file?
>
>
> Is that a correct assumption?
>
>
> Thanks
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Slack for PySpark users

2023-04-04 Thread Bjørn Jørgensen
way-so-hard-to-define
>>>>>>>>>
>>>>>>>>> It's unavoidable if "users" prefer to use an alternative
>>>>>>>>> communication mechanism rather than the user mailing list. Before 
>>>>>>>>> Stack
>>>>>>>>> Overflow days, there had been a meaningful number of questions around 
>>>>>>>>> user@.
>>>>>>>>> It's just impossible to let them go back and post to the user mailing 
>>>>>>>>> list.
>>>>>>>>>
>>>>>>>>> We just need to make sure it is not the purpose of employing Slack
>>>>>>>>> to move all discussions about developments, direction of the project, 
>>>>>>>>> etc
>>>>>>>>> which must happen in dev@/private@. The purpose of Slack thread
>>>>>>>>> here does not seem to aim to serve the purpose.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 31, 2023 at 7:00 AM Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Good discussions and proposals.all around.
>>>>>>>>>>
>>>>>>>>>> I have used slack in anger on a customer site before. For small
>>>>>>>>>> and medium size groups it is good and affordable. Alternatives have 
>>>>>>>>>> been
>>>>>>>>>> suggested as well so those who like investigative search can agree 
>>>>>>>>>> and come
>>>>>>>>>> up with a freebie one.
>>>>>>>>>> I am inclined to agree with Bjorn that this slack has more social
>>>>>>>>>> dimensions than the mailing list. It is akin to a sports club using
>>>>>>>>>> WhatsApp groups for communication. Remember we were originally 
>>>>>>>>>> looking for
>>>>>>>>>> space for webinars, including Spark on Linkedin that Denney Lee 
>>>>>>>>>> suggested.
>>>>>>>>>> I think Slack and mailing groups can coexist happily. On a more 
>>>>>>>>>> serious
>>>>>>>>>> note, when I joined the user group back in 2015-2016, there was a 
>>>>>>>>>> lot of
>>>>>>>>>> traffic. Currently we hardly get many mails daily <> less than 5. So 
>>>>>>>>>> having
>>>>>>>>>> a slack type medium may improve members participation.
>>>>>>>>>>
>>>>>>>>>> so +1 for me as well.
>>>>>>>>>>
>>>>>>>>>> Mich Talebzadeh,
>>>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>>>> Palantir Technologies Limited
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>view my Linkedin profile
>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>> other
>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>> content is
>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, 30 Mar 2023 at 22:19, Denny Lee 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> +1.

Re: Looping through a series of telephone numbers

2023-04-02 Thread Bjørn Jørgensen
damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 2 Apr 2023 at 09:17, Philippe de Rochambeau 
> wrote:
>
>> Many thanks, Mich.
>> Is « foreach »  the best construct to  lookup items is a dataset  such as
>> the below «  telephonedirectory » data set?
>>
>> val telrdd = spark.sparkContext.parallelize(Seq(«  tel1 » , «  tel2 » , «  
>> tel3 » …)) // the telephone sequence
>>
>> // was read for a CSV file
>>
>> val ds = spark.read.parquet(«  /path/to/telephonedirectory » )
>>
>>   rdd .foreach(tel => {
>> longAcc.select(«  * » ).rlike(«  + »  + tel)
>>   })
>>
>>
>>
>>
>> Le 1 avr. 2023 à 22:36, Mich Talebzadeh  a
>> écrit :
>>
>> This may help
>>
>> Spark rlike() Working with Regex Matching Example
>> <https://sparkbyexamples.com/spark/spark-rlike-regex-matching-examples/>s
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 1 Apr 2023 at 19:32, Philippe de Rochambeau 
>> wrote:
>>
>>> Hello,
>>> I’m looking for an efficient way in Spark to search for a series of
>>> telephone numbers, contained in a CSV file, in a data set column.
>>>
>>> In pseudo code,
>>>
>>> for tel in [tel1, tel2, …. tel40,000]
>>> search for tel in dataset using .like(« %tel% »)
>>> end for
>>>
>>> I’m using the like function because the telephone numbers in the data
>>> set main contain prefixes, such as « + « ; e.g., « +331222 ».
>>>
>>> Any suggestions would be welcome.
>>>
>>> Many thanks.
>>>
>>> Philippe
>>>
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Slack for PySpark users

2023-03-30 Thread Bjørn Jørgensen
>> On Wed, Mar 29, 2023 at 11:32 PM Xiao Li  wrote:
>>>>
>>>>> +1
>>>>>
>>>>> + @d...@spark.apache.org 
>>>>>
>>>>> This is a good idea. The other Apache projects (e.g., Pinot, Druid,
>>>>> Flink) have created their own dedicated Slack workspaces for faster
>>>>> communication. We can do the same in Apache Spark. The Slack workspace 
>>>>> will
>>>>> be maintained by the Apache Spark PMC. I propose to initiate a vote for 
>>>>> the
>>>>> creation of a new Apache Spark Slack workspace. Does that sound good?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Xiao
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Mich Talebzadeh  于2023年3月28日周二 07:07写道:
>>>>>
>>>>>> I created one at slack called pyspark
>>>>>>
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Technologies Limited
>>>>>>
>>>>>>
>>>>>>view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 28 Mar 2023 at 03:52, asma zgolli 
>>>>>> wrote:
>>>>>>
>>>>>>> +1 good idea, I d like to join as well.
>>>>>>>
>>>>>>> Le mar. 28 mars 2023 à 04:09, Winston Lai  a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Please let us know when the channel is created. I'd like to join :)
>>>>>>>>
>>>>>>>> Thank You & Best Regards
>>>>>>>> Winston Lai
>>>>>>>> --
>>>>>>>> *From:* Denny Lee 
>>>>>>>> *Sent:* Tuesday, March 28, 2023 9:43:08 AM
>>>>>>>> *To:* Hyukjin Kwon 
>>>>>>>> *Cc:* keen ; user@spark.apache.org <
>>>>>>>> user@spark.apache.org>
>>>>>>>> *Subject:* Re: Slack for PySpark users
>>>>>>>>
>>>>>>>> +1 I think this is a great idea!
>>>>>>>>
>>>>>>>> On Mon, Mar 27, 2023 at 6:24 PM Hyukjin Kwon 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Yeah, actually I think we should better have a slack channel so we
>>>>>>>> can easily discuss with users and developers.
>>>>>>>>
>>>>>>>> On Tue, 28 Mar 2023 at 03:08, keen  wrote:
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>> I really like *Slack *as communication channel for a tech
>>>>>>>> community.
>>>>>>>> There is a Slack workspace for *delta lake users* (
>>>>>>>> https://go.delta.io/slack) that I enjoy a lot.
>>>>>>>> I was wondering if there is something similar for PySpark users.
>>>>>>>>
>>>>>>>> If not, would there be anything wrong with creating a new
>>>>>>>> Slack workspace for PySpark users? (when explicitly mentioning that 
>>>>>>>> this is
>>>>>>>> *not* officially part of Apache Spark)?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Martin
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Asma ZGOLLI
>>>>>>>
>>>>>>> Ph.D. in Big Data - Applied Machine Learning
>>>>>>>
>>>>>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Topics for Spark online classes & webinars

2023-03-28 Thread Bjørn Jørgensen
;>>>>
>>>>>
>>>>>
>>>>> On Tue, 14 Mar 2023 at 15:09, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>> Hi Denny,
>>>>>
>>>>> That Apache Spark Linkedin page
>>>>> https://www.linkedin.com/company/apachespark/ looks fine. It also
>>>>> allows a wider audience to benefit from it.
>>>>>
>>>>> +1 for me
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 14 Mar 2023 at 14:23, Denny Lee  wrote:
>>>>>
>>>>> In the past, we've been using the Apache Spark LinkedIn page
>>>>> <https://www.linkedin.com/company/apachespark/> and group to
>>>>> broadcast these type of events - if you're cool with this?  Or we could go
>>>>> through the process of submitting and updating the current
>>>>> https://spark.apache.org or request to leverage the original Spark
>>>>> confluence page <https://cwiki.apache.org/confluence/display/SPARK>.
>>>>>WDYT?
>>>>>
>>>>> On Mon, Mar 13, 2023 at 9:34 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>> Well that needs to be created first for this purpose. The appropriate
>>>>> name etc. to be decided. Maybe @Denny Lee 
>>>>> can facilitate this as he offered his help.
>>>>>
>>>>>
>>>>> cheers
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 13 Mar 2023 at 16:29, asma zgolli 
>>>>> wrote:
>>>>>
>>>>> Hello Mich,
>>>>>
>>>>> Can you please provide the link for the confluence page?
>>>>>
>>>>> Many thanks
>>>>> Asma
>>>>> Ph.D. in Big Data - Applied Machine Learning
>>>>>
>>>>> Le lun. 13 mars 2023 à 17:21, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> a écrit :
>>>>>
>>>>> Apologies I missed the list.
>>>>>
>>>>> To move forward I selected these topics from the thread "Online
>>>>> classes for spark topics".
>>>>>
>>>>> To take this further I propose a confluence page to be seup.
>>>>>
>>>>>
>>>>>1. Spark UI
>>>>>2. Dynamic allocation
>>>>>3. Tuning of jobs
>>>>>4. Collecting spark metrics for monitoring and alerting
>>>>>5.  For those who prefer to use Pandas API on Spark since the
>>>>>release of Spark 3.2, What are some important notes for those users? 
>>>>> For
>>>>>example, what are the additional factors affecting the Spark 
>>>>> performance
>>>>>using Pandas API on Spark? How to tune them in addition to the 
>>>>> conventional
>>>>>Spark tuning methods applied to Spark SQL users.
>>>>>6. Spark internals and/or comparing spark 3 and 2
>>>>>7. Spark Streaming & Spark Structured Streaming
>>>>>8. Spark on notebooks
>>>>>9. Spark on serverless (for example Spark on Google Cloud)
>>>>>10. Spark on k8s
>>>>>
>>>>> Opinions and how to is welcome
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 13 Mar 2023 at 16:16, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>> Hi guys
>>>>>
>>>>> To move forward I selected these topics from the thread "Online
>>>>> classes for spark topics".
>>>>>
>>>>> To take this further I propose a confluence page to be seup.
>>>>>
>>>>> Opinions and how to is welcome
>>>>>
>>>>> Cheers
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>
>> --
>> Asma ZGOLLI
>>
>> PhD in Big Data - Applied Machine Learning
>> Email : zgollia...@gmail.com
>> Tel : (+49) 015777685768
>> Skype : asma_zgolli
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Topics for Spark online classes & webinars

2023-03-15 Thread Bjørn Jørgensen
t;>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 13 Mar 2023 at 16:29, asma zgolli 
>>>>> wrote:
>>>>>
>>>>>> Hello Mich,
>>>>>>
>>>>>> Can you please provide the link for the confluence page?
>>>>>>
>>>>>> Many thanks
>>>>>> Asma
>>>>>> Ph.D. in Big Data - Applied Machine Learning
>>>>>>
>>>>>> Le lun. 13 mars 2023 à 17:21, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> a écrit :
>>>>>>
>>>>>>> Apologies I missed the list.
>>>>>>>
>>>>>>> To move forward I selected these topics from the thread "Online
>>>>>>> classes for spark topics".
>>>>>>>
>>>>>>> To take this further I propose a confluence page to be seup.
>>>>>>>
>>>>>>>
>>>>>>>1. Spark UI
>>>>>>>2. Dynamic allocation
>>>>>>>3. Tuning of jobs
>>>>>>>4. Collecting spark metrics for monitoring and alerting
>>>>>>>5.  For those who prefer to use Pandas API on Spark since the
>>>>>>>release of Spark 3.2, What are some important notes for those users? 
>>>>>>> For
>>>>>>>example, what are the additional factors affecting the Spark 
>>>>>>> performance
>>>>>>>using Pandas API on Spark? How to tune them in addition to the 
>>>>>>> conventional
>>>>>>>Spark tuning methods applied to Spark SQL users.
>>>>>>>6. Spark internals and/or comparing spark 3 and 2
>>>>>>>7. Spark Streaming & Spark Structured Streaming
>>>>>>>8. Spark on notebooks
>>>>>>>9. Spark on serverless (for example Spark on Google Cloud)
>>>>>>>10. Spark on k8s
>>>>>>>
>>>>>>> Opinions and how to is welcome
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 13 Mar 2023 at 16:16, Mich Talebzadeh <
>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi guys
>>>>>>>>
>>>>>>>> To move forward I selected these topics from the thread "Online
>>>>>>>> classes for spark topics".
>>>>>>>>
>>>>>>>> To take this further I propose a confluence page to be seup.
>>>>>>>>
>>>>>>>> Opinions and how to is welcome
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>> which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>> damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [PySpark SQL] New column with the maximum of multiple terms?

2023-02-23 Thread Bjørn Jørgensen
>
>>>> ```
>>>> ValueError: Cannot convert column into bool: please use '&' for 'and',
>>>> '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
>>>> ```
>>>>
>>>>   How can I do this? Thanks!
>>>>
>>>>  Best, Oliver
>>>>
>>>> --
>>>> Oliver Ruebenacker, Ph.D. (he)
>>>> Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>,
>>>> Flannick Lab <http://www.flannicklab.org/>, Broad Institute
>>>> <http://www.broadinstitute.org/>
>>>>
>>>
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>, 
>> Flannick
>> Lab <http://www.flannicklab.org/>, Broad Institute
>> <http://www.broadinstitute.org/>
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Graceful shutdown SPARK Structured Streaming

2023-02-19 Thread Bjørn Jørgensen
y 2 seconds.
>>>>>
>>>>> In your writestream add the following line to be able to identify
>>>>> topic name
>>>>>
>>>>> trigger(processingTime='30 seconds'). \
>>>>> *queryName('md'). *\
>>>>>
>>>>> Next the controlling topic (called newtopic)  has the following
>>>>>
>>>>> foreachBatch(*sendToControl*). \
>>>>> trigger(processingTime='2 seconds'). \
>>>>> queryName('newtopic'). \
>>>>>
>>>>> That method sendToControl does what is needed
>>>>>
>>>>> def sendToControl(dfnewtopic, batchId):
>>>>> if(len(dfnewtopic.take(1))) > 0:
>>>>> #print(f"""newtopic batchId is {batchId}""")
>>>>> #dfnewtopic.show(10,False)
>>>>> queue = dfnewtopic.select(col("queue")).collect()[0][0]
>>>>> status = dfnewtopic.select(col("status")).collect()[0][0]
>>>>>
>>>>> if((queue == 'md')) & (status == 'false')):
>>>>>   spark_session = s.spark_session(config['common']['appName'])
>>>>>   active = spark_session.streams.active
>>>>>   for e in active:
>>>>>  #print(e)
>>>>>  name = e.name
>>>>>  if(name == 'md'):
>>>>> print(f"""Terminating streaming process {name}""")
>>>>> e.stop()
>>>>> else:
>>>>> print("DataFrame newtopic is empty")
>>>>>
>>>>> This seems to work as I checked it to ensure that in this case data
>>>>> was written and saved to the target sink (BigQuery table). It will wait
>>>>> until data is written completely meaning the current streaming message is
>>>>> processed and there is a latency there (meaning waiting for graceful
>>>>> completion)
>>>>>
>>>>> This is the output
>>>>>
>>>>> Terminating streaming process md
>>>>> wrote to DB  ## this is the flag  I added to ensure the current
>>>>> micro-bath was completed
>>>>> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md
>>>>> [id = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
>>>>> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>>>>>
>>>>> The various termination processes are described in
>>>>>
>>>>> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
>>>>> (apache.org)
>>>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries>
>>>>>
>>>>> This is the idea I came up with which allows ending the streaming
>>>>> process with least cost.
>>>>>
>>>>> HTH
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 May 2021 at 17:30, Gourav Sengupta <
>>>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> just thought of reaching out once again and seeking out your kind
>>>>>> help to find out what is the best way to stop SPARK streaming gracefully.
>>>>>> Do we still use the methods of creating a file as in SPARK 2.4.x which is
>>>>>> several years old method or do we have a better approach in SPARK 3.1?
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> -- Forwarded message -
>>>>>> From: Gourav Sengupta 
>>>>>> Date: Wed, Apr 21, 2021 at 10:06 AM
>>>>>> Subject: Graceful shutdown SPARK Structured Streaming
>>>>>> To: 
>>>>>>
>>>>>>
>>>>>> Dear friends,
>>>>>>
>>>>>> is there any documentation available for gracefully stopping SPARK
>>>>>> Structured Streaming in 3.1.x?
>>>>>>
>>>>>> I am referring to articles which are 4 to 5 years old and was
>>>>>> wondering whether there is a better way available today to gracefully
>>>>>> shutdown a SPARK streaming job.
>>>>>>
>>>>>> Thanks a ton in advance for all your kind help.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Bjørn Jørgensen
Use explode_outer() when rows have null values.

tor. 16. feb. 2023 kl. 16:48 skrev Navneet :

> I am not expert, may be try if this works:
> In order to achieve the desired output using the explode() method in
> Java, you can create a User-Defined Function (UDF) that zips the lists
> in each row and returns the resulting list. Here's an example
> implementation:
>
> typescript
> Copy code
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.api.java.UDF1;
> import org.apache.spark.sql.types.DataTypes;
>
> public class ZipRows implements UDF1 {
> @Override
> public Row call(Row row) {
> List list1 = row.getList(0);
> List list2 = row.getList(1);
> List list3 = row.getList(2);
> List> zipped = new ArrayList<>();
> for (int i = 0; i < list1.size(); i++) {
> List sublist = new ArrayList<>();
> sublist.add(list1.get(i));
> sublist.add(list2.get(i));
> sublist.add(list3.get(i));
> zipped.add(sublist);
> }
> return RowFactory.create(zipped);
> }
> }
> This UDF takes a Row as input, which contains the three lists in each
> row of the original DataFrame. It then zips these lists using a loop
> that creates a new sublist for each element in the lists. Finally, it
> returns a new Row that contains the zipped list.
>
> You can then use this UDF in combination with explode() to achieve the
> desired output:
>
> javascript
> Copy code
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import static org.apache.spark.sql.functions.*;
>
> // assuming you have a Dataset called "df"
> df.withColumn("zipped", callUDF(new ZipRows(),
>
> DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.StringType))),
> "col1", "col2", "col3")
> .selectExpr("explode(zipped) as zipped")
> .selectExpr("zipped[0] as col1", "zipped[1] as col2", "zipped[2] as col3")
> .show();
> This code first adds a new column called "zipped" to the DataFrame
> using the callUDF() function, which applies the ZipRows UDF to the
> "col1", "col2", and "col3" columns. It then uses explode() to explode
> the "zipped" column, and finally selects the three sub-elements of the
> zipped list as separate columns using selectExpr(). The output should
> be the desired DataFrame.
>
>
>
> Regards,
> Navneet Kr
>
>
> On Thu, 16 Feb 2023 at 00:07, Enrico Minack 
> wrote:
> >
> > You have to take each row and zip the lists, each element of the result
> becomes one new row.
> >
> > So turn write a method that turns
> >   Row(List("A","B","null"), List("C","D","null"),
> List("E","null","null"))
> > into
> >   List(List("A","C","E"), List("B","D","null"),
> List("null","null","null"))
> > and use flatmap with that method.
> >
> > In Scala, this would read:
> >
> > df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
> row.getSeq[String](2)).zipped.toIterable }.show()
> >
> > Enrico
> >
> >
> > Am 14.02.23 um 22:54 schrieb sam smith:
> >
> > Hello guys,
> >
> > I have the following dataframe:
> >
> > col1
> >
> > col2
> >
> > col3
> >
> > ["A","B","null"]
> >
> > ["C","D","null"]
> >
> > ["E","null","null"]
> >
> >
> >
> > I want to explode it to the following dataframe:
> >
> > col1
> >
> > col2
> >
> > col3
> >
> > "A"
> >
> > "C"
> >
> > "E"
> >
> > "B"
> >
> > "D"
> >
> > "null"
> >
> > "null"
> >
> > "null"
> >
> > "null"
> >
> >
> > How to do that (preferably in Java) using the explode() method ? knowing
> that something like the following won't yield correct output:
> >
> > for (String colName: dataset.columns())
> > dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark SQL question

2023-01-28 Thread Bjørn Jørgensen
Hi Mich.
This is a Spark user group mailing list where people can ask *any*
questions about spark.
You know SQL and streaming, but I don't think it's necessary to start a
replay with "*LOL*" to the question that's being asked.
No questions are to stupid to be asked.


lør. 28. jan. 2023 kl. 09:22 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

> LOL
>
> First one
>
> spark-sql> select 1 as `data.group` from abc group by data.group;
> 1
> Time taken: 0.198 seconds, Fetched 1 row(s)
>
> means that are assigning alias data.group to select and you are using that
> alias -> data.group in your group by statement
>
>
> This is equivalent to
>
>
> spark-sql> select 1 as `data.group` from abc group by 1;
>
> 1
>
> With regard to your second sql
>
>
> select 1 as *`data.group`* from tbl group by `*data.group`;*
>
>
> *will throw an error *
>
>
> *spark-sql> select 1 as `data.group` from abc group by `data.group`;*
>
> *Error in query: cannot resolve '`data.group`' given input columns:
> [spark_catalog.elayer.abc.keyword, spark_catalog.elayer.abc.occurence];
> line 1 pos 43;*
>
> *'Aggregate ['`data.group`], [1 AS data.group#225]*
>
> *+- SubqueryAlias spark_catalog.elayer.abc*
>
> *   +- HiveTableRelation [`elayer`.`abc`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols:
> [keyword#226, occurence#227L], Partition Cols: []]*
>
> `data.group` with quotes is neither the name of the column or its alias
>
>
> *HTH*
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 27 Jan 2023 at 23:36, Kohki Nishio  wrote:
>
>> this SQL works
>>
>> select 1 as *`data.group`* from tbl group by *data.group*
>>
>>
>> Since there's no such field as *data,* I thought the SQL has to look
>> like this
>>
>> select 1 as *`data.group`* from tbl group by `*data.group`*
>>
>>
>>  But that gives and error (cannot resolve '`data.group`') ... I'm no
>> expert in SQL, but feel like it's a strange behavior... does anybody have a
>> good explanation for it ?
>>
>> Thanks
>>
>> --
>> Kohki Nishio
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [PySpark] Error using SciPy: ValueError: numpy.ndarray size changed, may indicate binary incompatibility. Expected 88 from C header, got 80 from PyObject

2023-01-06 Thread Bjørn Jørgensen
fc37.x86_64

  python3-pip-22.2.2-3.fc37.noarch

  python3-setuptools-62.6.0-2.fc37.noarch


Complete!
Removing intermediate container a7c648ae7014
 ---> 593ffbdb0c08
Step 3/3 : RUN sudo pip3 install -U Cython && sudo pip3 install -U pybind11
&& sudo pip3 install -U pythran && sudo pip3 install -U numpy && sudo pip3
install -U scipy
 ---> Running in 97aa856851b4
Collecting Cython
  Downloading
Cython-0.29.33-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl
(1.9 MB)
  1.9/1.9 MB 21.0 MB/s eta
0:00:00
Installing collected packages: Cython
Successfully installed Cython-0.29.33
WARNING: Running pip as the 'root' user can result in broken permissions
and conflicting behaviour with the system package manager. It is
recommended to use a virtual environment instead:
https://pip.pypa.io/warnings/venv
Collecting pybind11
  Downloading pybind11-2.10.3-py3-none-any.whl (222 kB)
 ━━━ 222.4/222.4 kB 4.4 MB/s eta
0:00:00
Installing collected packages: pybind11
Successfully installed pybind11-2.10.3
WARNING: Running pip as the 'root' user can result in broken permissions
and conflicting behaviour with the system package manager. It is
recommended to use a virtual environment instead:
https://pip.pypa.io/warnings/venv
Collecting pythran
  Downloading pythran-0.12.0-py3-none-any.whl (4.2 MB)
  4.2/4.2 MB 26.8 MB/s eta
0:00:00
Collecting ply>=3.4
  Downloading ply-3.11-py2.py3-none-any.whl (49 kB)
  49.6/49.6 kB 7.1 MB/s eta
0:00:00
Collecting gast~=0.5.0
  Downloading gast-0.5.3-py3-none-any.whl (19 kB)
Collecting numpy
  Downloading
numpy-1.24.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
(17.3 MB)
  17.3/17.3 MB 34.8 MB/s eta
0:00:00
Collecting beniget~=0.4.0
  Downloading beniget-0.4.1-py3-none-any.whl (9.4 kB)
Installing collected packages: ply, numpy, gast, beniget, pythran
Successfully installed beniget-0.4.1 gast-0.5.3 numpy-1.24.1 ply-3.11
pythran-0.12.0
WARNING: Running pip as the 'root' user can result in broken permissions
and conflicting behaviour with the system package manager. It is
recommended to use a virtual environment instead:
https://pip.pypa.io/warnings/venv
Requirement already satisfied: numpy in
/usr/local/lib64/python3.11/site-packages (1.24.1)
WARNING: Running pip as the 'root' user can result in broken permissions
and conflicting behaviour with the system package manager. It is
recommended to use a virtual environment instead:
https://pip.pypa.io/warnings/venv
Collecting scipy
  Downloading
scipy-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
(34.1 MB)
  34.1/34.1 MB 30.8 MB/s eta
0:00:00
Requirement already satisfied: numpy<1.27.0,>=1.19.5 in
/usr/local/lib64/python3.11/site-packages (from scipy) (1.24.1)
Installing collected packages: scipy
Successfully installed scipy-1.10.0
WARNING: Running pip as the 'root' user can result in broken permissions
and conflicting behaviour with the system package manager. It is
recommended to use a virtual environment instead:
https://pip.pypa.io/warnings/venv
Removing intermediate container 97aa856851b4
 ---> 9b2a51c94efe
Successfully built 9b2a51c94efe
Successfully tagged fedoratest:latest




[root@afebdea35f7c /]# python
bash: python: command not found
[root@afebdea35f7c /]# python3
Python 3.11.1 (main, Dec  7 2022, 00:00:00) [GCC 12.2.1 20221121 (Red Hat
12.2.1-4)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from scipy.stats import norm
>>>



fre. 6. jan. 2023 kl. 18:12 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

> Thank you for the link. I already tried most of what was suggested there,
> but without success.
>
> On Fri, Jan 6, 2023 at 11:35 AM Bjørn Jørgensen 
> wrote:
>
>>
>>
>>
>> https://stackoverflow.com/questions/66060487/valueerror-numpy-ndarray-size-changed-may-indicate-binary-incompatibility-exp
>>
>>
>>
>>
>> fre. 6. jan. 2023, 16:01 skrev Oliver Ruebenacker <
>> oliv...@broadinstitute.org>:
>>
>>>
>>>  Hello,
>>>
>>>   I'm trying to install SciPy using a bootstrap script and then use it
>>> to calculate a new field in a dataframe, running on AWS EMR.
>>>
>>>   Although the SciPy website states that only NumPy is needed, when I
>>> tried to install SciPy using pip, pip kept failing, complaining about
>>> missing software, until I ended up with this bootstrap script:
>>>
>>>
>>>
>>>
>>>
>>>
>>> *sudo yum install -y python3-develsudo pi

Re: [PySpark] Error using SciPy: ValueError: numpy.ndarray size changed, may indicate binary incompatibility. Expected 88 from C header, got 80 from PyObject

2023-01-06 Thread Bjørn Jørgensen
https://stackoverflow.com/questions/66060487/valueerror-numpy-ndarray-size-changed-may-indicate-binary-incompatibility-exp




fre. 6. jan. 2023, 16:01 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

>
>  Hello,
>
>   I'm trying to install SciPy using a bootstrap script and then use it to
> calculate a new field in a dataframe, running on AWS EMR.
>
>   Although the SciPy website states that only NumPy is needed, when I
> tried to install SciPy using pip, pip kept failing, complaining about
> missing software, until I ended up with this bootstrap script:
>
>
>
>
>
>
> *sudo yum install -y python3-develsudo pip3 install -U Cythonsudo pip3
> install -U pybind11sudo pip3 install -U pythransudo pip3 install -U
> numpysudo pip3 install -U scipy*
>
>   At this point, the bootstrap seems to be successful, but then at this
> line:
>
> *from scipy.stats import norm*
>
>   I get the following error:
>
> *ValueError: numpy.ndarray size changed, may indicate binary
> incompatibility. Expected 88 from C header, got 80 from PyObject*
>
>   Any advice on how to proceed? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Bjørn Jørgensen
https://github.com/apache/spark/pull/39134

tir. 20. des. 2022, 22:42 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

> Thank you for the suggestion. This would, however, involve converting my
> Dataframe to an RDD (and back later), which involves additional costs.
>
> On Tue, Dec 20, 2022 at 7:30 AM Raghavendra Ganesh <
> raghavendr...@gmail.com> wrote:
>
>> you can groupBy(country). and use mapPartitions method in which you can
>> iterate over all rows keeping 2 variables for maxPopulationSoFar and
>> corresponding city. Then return the city with max population.
>> I think as others suggested, it may be possible to use Bucketing, it
>> would give a more friendly SQL'ish way of doing and but not be the best in
>> performance as it needs to order/sort.
>> --
>> Raghavendra
>>
>>
>> On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   How can I retain from each group only the row for which one value is
>>> the maximum of the group? For example, imagine a DataFrame containing all
>>> major cities in the world, with three columns: (1) City name (2) Country
>>> (3) population. How would I get a DataFrame that only contains the largest
>>> city in each country? Thanks!
>>>
>>>  Best, Oliver
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Bjørn Jørgensen
Post an example dataframe and how you will have the result.

man. 19. des. 2022 kl. 20:36 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

> Thank you, that is an interesting idea. Instead of finding the maximum
> population, we are finding the maximum (population, city name) tuple.
>
> On Mon, Dec 19, 2022 at 2:10 PM Bjørn Jørgensen 
> wrote:
>
>> We have pandas API on spark
>> <https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html>
>> which is very good.
>>
>> from pyspark import pandas as ps
>>
>> You can use pdf = df.pandas_api()
>> Where df is your pyspark dataframe.
>>
>>
>> [image: image.png]
>>
>> Does this help you?
>>
>> df.groupby(['Country'])[['Population', 'City']].max()
>>
>> man. 19. des. 2022 kl. 18:22 skrev Patrick Tucci > >:
>>
>>> Window functions don't work like traditional GROUP BYs. They allow you
>>> to partition data and pull any relevant column, whether it's used in the
>>> partition or not.
>>>
>>> I'm not sure what the syntax is for PySpark, but the standard SQL would
>>> be something like this:
>>>
>>> WITH InputData AS
>>> (
>>>   SELECT 'USA' Country, 'New York' City, 900 Population
>>>   UNION
>>>   SELECT 'USA' Country, 'Miami', 620 Population
>>>   UNION
>>>   SELECT 'Ukraine' Country, 'Kyiv', 300 Population
>>>   UNION
>>>   SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
>>> )
>>>
>>>  SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population
>>> DESC) PopulationRank
>>>  FROM InputData;
>>>
>>> Results would be something like this:
>>>
>>> CountryCity   Population PopulationRank
>>> UkraineKyiv   3001
>>> UkraineKharkiv1402
>>> USANew York   9001
>>> USAMiami  6202
>>>
>>> Which you could further filter in another CTE or subquery where
>>> PopulationRank = 1.
>>>
>>> As I mentioned, I'm not sure how this translates into PySpark, but
>>> that's the general concept in SQL.
>>>
>>> On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>
>>>> If we only wanted to know the biggest population, max function would
>>>> suffice. The problem is I also want the name of the city with the biggest
>>>> population.
>>>>
>>>> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>>>>
>>>>> As Mich says, isn't this just max by population partitioned by country
>>>>> in a window function?
>>>>>
>>>>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>>>>> oliv...@broadinstitute.org> wrote:
>>>>>
>>>>>>
>>>>>>  Hello,
>>>>>>
>>>>>>   Thank you for the response!
>>>>>>
>>>>>>   I can think of two ways to get the largest city by country, but
>>>>>> both seem to be inefficient:
>>>>>>
>>>>>>   (1) I could group by country, sort each group by population, add
>>>>>> the row number within each group, and then retain only cities with a row
>>>>>> number equal to 1. But it seems wasteful to sort everything when I only
>>>>>> want the largest of each country
>>>>>>
>>>>>>   (2) I could group by country, get the maximum city population for
>>>>>> each country, join that with the original data frame, and then retain 
>>>>>> only
>>>>>> cities with population equal to the maximum population in the country. 
>>>>>> But
>>>>>> that seems also expensive because I need to join.
>>>>>>
>>>>>>   Am I missing something?
>>>>>>
>>>>>>   Thanks!
>>>>>>
>>>>>>  Best, Oliver
>>>>>>
>>>>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> In spark you can use windowing function
>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>> achieve this
>>>>>

Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Bjørn Jørgensen
We have pandas API on spark
<https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html>
which is very good.

from pyspark import pandas as ps

You can use pdf = df.pandas_api()
Where df is your pyspark dataframe.


[image: image.png]

Does this help you?

df.groupby(['Country'])[['Population', 'City']].max()

man. 19. des. 2022 kl. 18:22 skrev Patrick Tucci :

> Window functions don't work like traditional GROUP BYs. They allow you to
> partition data and pull any relevant column, whether it's used in the
> partition or not.
>
> I'm not sure what the syntax is for PySpark, but the standard SQL would be
> something like this:
>
> WITH InputData AS
> (
>   SELECT 'USA' Country, 'New York' City, 900 Population
>   UNION
>   SELECT 'USA' Country, 'Miami', 620 Population
>   UNION
>   SELECT 'Ukraine' Country, 'Kyiv', 300 Population
>   UNION
>   SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
> )
>
>  SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population
> DESC) PopulationRank
>  FROM InputData;
>
> Results would be something like this:
>
> CountryCity   Population PopulationRank
> UkraineKyiv   3001
> UkraineKharkiv1402
> USANew York   9001
> USAMiami  6202
>
> Which you could further filter in another CTE or subquery where
> PopulationRank = 1.
>
> As I mentioned, I'm not sure how this translates into PySpark, but that's
> the general concept in SQL.
>
> On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>> If we only wanted to know the biggest population, max function would
>> suffice. The problem is I also want the name of the city with the biggest
>> population.
>>
>> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>>
>>> As Mich says, isn't this just max by population partitioned by country
>>> in a window function?
>>>
>>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>
>>>>
>>>>  Hello,
>>>>
>>>>   Thank you for the response!
>>>>
>>>>   I can think of two ways to get the largest city by country, but both
>>>> seem to be inefficient:
>>>>
>>>>   (1) I could group by country, sort each group by population, add the
>>>> row number within each group, and then retain only cities with a row number
>>>> equal to 1. But it seems wasteful to sort everything when I only want the
>>>> largest of each country
>>>>
>>>>   (2) I could group by country, get the maximum city population for
>>>> each country, join that with the original data frame, and then retain only
>>>> cities with population equal to the maximum population in the country. But
>>>> that seems also expensive because I need to join.
>>>>
>>>>   Am I missing something?
>>>>
>>>>   Thanks!
>>>>
>>>>  Best, Oliver
>>>>
>>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> In spark you can use windowing function
>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>> achieve this
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>>>>> oliv...@broadinstitute.org> wrote:
>>>>>
>>>>>>
>>>>>>  Hello,
>>>>>>
>>>>>>   How can I retain from each group only the row for which one value
>>>>>> is t

Re: Unable to run Spark Job(3.3.2 SNAPSHOT) with Volcano scheduler in Kubernetes

2022-12-16 Thread Bjørn Jørgensen
cala:28)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:57)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>>>>>> at scala.reflect.internal.Trees.itransform(Trees.scala:1411)
>>>>>>> at scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:57)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>>>>>> at scala.reflect.internal.Trees.itransform(Trees.scala:1430)
>>>>>>> at scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:57)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>>>>>> at scala.reflect.internal.Trees.itransform(Trees.scala:1409)
>>>>>>> at scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:57)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>>>>>> at scala.reflect.internal.Trees.itransform(Trees.scala:1436)
>>>>>>> at scala.reflect.internal.Trees.itransform$(Trees.scala:1400)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:28)
>>>>>>> at
>>>>>>> scala.reflect.api.Trees$Transformer.transform(Trees.scala:2563)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:57)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
>>>>>>> at
>>>>>>> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
>>>>>>> at scala.reflect.internal.Trees.itransform(Trees.scala:1411)
>>>>>>>
>>>>>>>
>>>
>>> --
>>> Thanks
>>> Gnana
>>>
>>
>
> --
> Thanks
> Gnana
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How can I use backticks in column names?

2022-12-05 Thread Bjørn Jørgensen
df = spark.createDataFrame(
[("china", "asia"), ("colombia", "south america`")],
["country", "continent`"]
)
df.show()


++--+
| country|continent`|
++--+
|   china|  asia|
|colombia|south america`|
++--+



df.select("continent`").show(1)

(...)

AnalysisException: Syntax error in attribute name: continent`.



clean_df = df.toDF(*(c.replace('`', '_') for c in df.columns))
clean_df.show()


++--+
| country|continent_|
++--+
|   china|  asia|
|colombia|south america`|
++--+


clean_df.select("continent_").show(2)


+--+
|continent_|
+--+
|  asia|
|south america`|
+--+


Examples are from MungingData Avoiding Dots / Periods in PySpark
Column Names <https://mungingdata.com/pyspark/avoid-dots-periods-column-names/>


man. 5. des. 2022 kl. 06:56 skrev 한승후 :

> Spark throws an exception if there are backticks in the column name.
>
> Please help me.
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Creating a Spark 3 Connector

2022-11-23 Thread Bjørn Jørgensen
This is from the vote for spark connector. Is this you are looking for?

The goal of the SPIP is to introduce a Dataframe based client/server API
for Spark

Please also refer to:

- Previous discussion in dev mailing list: [DISCUSS] SPIP: Spark Connect -
A client and server interface for Apache Spark.
<https://lists.apache.org/thread/3fd2n34hlyg872nr55rylbv5cg8m1556>
- Design doc: Spark Connect - A client and server interface for Apache
Spark.
<https://docs.google.com/document/d/1Mnl6jmGszixLW4KcJU5j9IgpG9-UabS0dcM6PM2XGDc/edit?usp=sharing>
- JIRA: SPARK-39375 <https://issues.apache.org/jira/browse/SPARK-39375>

ons. 23. nov. 2022 kl. 17:36 skrev Mitch Shepherd <
mitch.sheph...@marklogic.com>:

> Hello,
>
>
>
> I’m wondering if anyone can point me in the right direction for a Spark
> connector developer guide.
>
>
>
> I’m looking for information on writing a new connector for Spark to move
> data between Apache Spark and other systems.
>
>
>
> Any information would be helpful. I found a similar thing for Kafka
> <https://docs.confluent.io/platform/current/connect/devguide.html> but
> haven’t been able to track down documentation for Spark.
>
>
>
> Best,
>
> Mitch
>
> This message and any attached documents contain information of MarkLogic
> and/or its customers that may be confidential and/or privileged. If you are
> not the intended recipient, you may not read, copy, distribute, or use this
> information. If you have received this transmission in error, please notify
> the sender immediately by reply e-mail and then delete this message. This
> email may contain pricing or other suggested contract terms related to
> MarkLogic software or services. Any such terms are not binding on MarkLogic
> unless and until they are included in a definitive agreement executed by
> MarkLogic.
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: spark - local question

2022-11-05 Thread Bjørn Jørgensen
I am using jupyter docker stack with spark.
So I started a new notebook and this code.

import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

import time

t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4


def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.adaptive.enabled", "True"
).set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "100"
).set(
"sc.setLogLevel", "ERROR"
).set(
"spark.executor.memory", "8g")

return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My_app", SparkConf())

Gives my this in http://HOSTIP:4040/environment/

[image: image.png]

So it works.

lør. 5. nov. 2022 kl. 19:21 skrev 张健BJ :

> ok,thank you very much :) I also have two questions:
> does the "spark. read. format (" jdbc ")" read all the data from the database 
> at once, and does it require a limit. My test result is that with the 
> increase of data, I observe that the local memory usage has not changed 
> significantly. Why?
>
> In addition, I tried to set "spark. driver. memory" and "spark. executor. 
> memory" to 4g in local mode, but I observed that the memory usage did not 
> work, and it was always about 1g. The
> code is as follows:
>
> import multiprocessing
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
> import time
>
> t1 = time.time()
> number_cores = int(multiprocessing.cpu_count())
> memory_gb = 4
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster("local[{}]".format(number_cores))
> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
> "spark.sql.adaptive.enabled", "True"
> ).set(
> "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
> ).set(
> "spark.sql.repl.eagerEval.maxNumRows", "100"
> ).set(
> "sc.setLogLevel", "ERROR"
> ).set(
> "spark.executor.memory", "4g")
>
> return 
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
> df = spark.read.format("jdbc").options(
> url='jdbc:mysql://127.0.0.1:63306/recommend?useSSL=false',
> driver='com.mysql.jdbc.Driver',
> dbtable="item_info",
> user="root",
> password="root"
> ).load()
> my_url = 'jdbc:mysql://127.0.0.1:63306/etl?useSSL=false'
> auth_mysql = {'user': 'root', 'password': 'root'}
> df = df.withColumnRenamed("id", "itemid").withColumnRenamed("category", 
> "cateid") \
> .withColumnRenamed('weight', 'score').withColumnRenamed('tag', 
> 'item_tags') \
> .withColumnRenamed('modify_time', 
> 'item_modify_time').withColumnRenamed('start_time', 'dg_start_time') \
> .withColumnRenamed('end_time', 'dg_end_time')
> df = df.select(
> ['itemid', 'cateid', 'title', 'score', 'item_tags', 'item_modify_time', 
> 'dg_start_time', 'dg_end_time']).limit(20)
> df.write.jdbc(my_url, 'item_info', mode='append', properties=auth_mysql)
> print(time.time() - t1)
>
> --
> 发件人:Bjørn Jørgensen 
> 发送时间:2022年11月5日(星期六) 04:51
> 收件人:Sean Owen 
> 抄 送:张健BJ ; user 
> 主 题:Re: spark - local question
>
> Yes, Spark in local mode works :)
> One tip
> If you just start it, then the default settings is one core and 1 GB.
>
> I'm using this func to start spark in local mode to get all cors and max
> RAM
>
> import multiprocessing
> import os
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster("local[{}]".format(number_cores))
> conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
> "spark.sql.adaptive.enabled", &quo

Re: spark - local question

2022-11-04 Thread Bjørn Jørgensen
Yes, Spark in local mode works :)
One tip
If you just start it, then the default settings is one core and 1 GB.

I'm using this func to start spark in local mode to get all cors and max RAM

import multiprocessing
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext


number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster("local[{}]".format(number_cores))
conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.adaptive.enabled", "True"
).set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "100"
).set(
"sc.setLogLevel", "ERROR"
)

return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My_app", SparkConf())



Now when you type spark you will see something like this.


SparkSession - in-memory

SparkContext

Spark UI

Version v3.4.0-SNAPSHOT
Master  local[16]
AppName My_app


man. 31. okt. 2022 kl. 14:50 skrev Sean Owen :

> Sure, as stable and available as your machine is. If you don't need fault
> tolerance or scale beyond one machine, sure.
>
> On Mon, Oct 31, 2022 at 8:43 AM 张健BJ  wrote:
>
>> Dear developers:
>> I have a question about  the pyspark local
>> mode. Can it be used in production and Will it cause unexpected problems?
>> The scenario is as follows:
>>
>> Our team wants to develop an etl component based on python language. Data 
>> can be transferred between various data sources.
>>
>> If there is no yarn environment, can we read data from Database A and write 
>> it to Database B in local mode.Will this function be guaranteed to be stable 
>> and available?
>>
>>
>>
>> Thanks,
>> Look forward to your reply
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [Spark Core][Release]Can we consider add SPARK-39725 into 3.3.1 or 3.3.2 release?

2022-10-04 Thread Bjørn Jørgensen
I have made a PR <https://github.com/apache/spark/pull/38098> for this now.

tir. 4. okt. 2022 kl. 19:02 skrev Sean Owen :

> I think it's fine to backport that to 3.3.x, regardless of whether it
> clearly affects Spark or not.
>
> On Tue, Oct 4, 2022 at 11:31 AM phoebe chen 
> wrote:
>
>> Hi:
>> (Not sure if this mailing group is good to use for such question, but
>> just try my luck here, thanks)
>>
>> SPARK-39725 <https://issues.apache.org/jira/browse/SPARK-39725> has
>> fix for security issues CVE-2022-2047 and CVE2022-2048 (High), which was
>> set to 3.4.0 release but that will happen Feb 2023. Is it possible to have
>> it in any earlier release such as 3.3.1 or 3.3.2?
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Issue with SparkContext

2022-09-20 Thread Bjørn Jørgensen
Hi, we have a user group at user@spark.apache.org

You must install a java JRE

If you are on ubuntu you can type
apt-get install openjdk-17-jre-headless

tir. 20. sep. 2022 kl. 06:15 skrev yogita bhardwaj <
yogita.bhard...@iktara.ai>:

>
>
> I am getting the py4j.protocol.Py4JJavaError while running SparkContext.
> Can you please help me to resolve this issue.
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Re: [how to]RDD using JDBC data source in PySpark

2022-09-20 Thread Bjørn Jørgensen
There is a PR for this now. [SPARK-40491][SQL] Expose a jdbcRDD function in
SparkContext <https://github.com/apache/spark/pull/37937>

man. 19. sep. 2022 kl. 12:47 skrev javaca...@163.com :

> Thank you Bjorn Jorgensen and also thank to Sean Owen.
>
> DataFrame and .format("jdbc") is good way to resolved it.
> But in some reasons, i can't using DataFrame API, only can use RDD API in
> PySpark.
> ...T_T...
>
> thanks all you guys help.  but still need new idea to resolve it. XD
>
>
>
>
>
> ------
> javaca...@163.com
>
>
> *发件人:* Bjørn Jørgensen 
> *发送时间:* 2022-09-19 18:34
> *收件人:* javaca...@163.com
> *抄送:* Xiao, Alton ; user@spark.apache.org
> *主题:* Re: 答复: [how to]RDD using JDBC data source in PySpark
> https://www.projectpro.io/recipes/save-dataframe-mysql-pyspark
> and
> https://towardsdatascience.com/pyspark-mysql-tutorial-fa3f7c26dc7
>
> man. 19. sep. 2022 kl. 12:29 skrev javaca...@163.com :
>
>> Thank you answer alton.
>>
>> But i see that is use scala to implement it.
>> I know java/scala can get data from mysql using JDBCRDD farily well.
>> But i want to get same way in Python Spark.
>>
>> Would you to give me more advice, very thanks to you.
>>
>>
>> --
>> javaca...@163.com
>>
>>
>> *发件人:* Xiao, Alton 
>> *发送时间:* 2022-09-19 18:04
>> *收件人:* javaca...@163.com; user@spark.apache.org
>> *主题:* 答复: [how to]RDD using JDBC data source in PySpark
>>
>> Hi javacaoyu:
>>
>> https://hevodata.com/learn/spark-mysql/#Spark-MySQL-Integration
>>
>> I think spark have already integrated mysql
>>
>>
>>
>> *发件人**:* javaca...@163.com 
>> *日期**:* 星期一, 2022年9月19日 17:53
>> *收件人**:* user@spark.apache.org 
>> *主题**:* [how to]RDD using JDBC data source in PySpark
>>
>> 你通常不会收到来自 javaca...@163.com 的电子邮件。了解这一点为什么很重要
>> <https://aka.ms/LearnAboutSenderIdentification>
>>
>> Hi guys:
>>
>>
>>
>> Does have some way to let rdd can using jdbc data source in pyspark?
>>
>>
>>
>> i want to get data from mysql, but in PySpark, there is not supported
>> JDBCRDD like java/scala.
>>
>> and i search docs from web site, no answer.
>>
>>
>>
>>
>>
>> So i need your guys help,  Thank you very much.
>>
>>
>> --
>>
>> javaca...@163.com
>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: 答复: [how to]RDD using JDBC data source in PySpark

2022-09-19 Thread Bjørn Jørgensen
https://www.projectpro.io/recipes/save-dataframe-mysql-pyspark
and
https://towardsdatascience.com/pyspark-mysql-tutorial-fa3f7c26dc7

man. 19. sep. 2022 kl. 12:29 skrev javaca...@163.com :

> Thank you answer alton.
>
> But i see that is use scala to implement it.
> I know java/scala can get data from mysql using JDBCRDD farily well.
> But i want to get same way in Python Spark.
>
> Would you to give me more advice, very thanks to you.
>
>
> --
> javaca...@163.com
>
>
> *发件人:* Xiao, Alton 
> *发送时间:* 2022-09-19 18:04
> *收件人:* javaca...@163.com; user@spark.apache.org
> *主题:* 答复: [how to]RDD using JDBC data source in PySpark
>
> Hi javacaoyu:
>
> https://hevodata.com/learn/spark-mysql/#Spark-MySQL-Integration
>
> I think spark have already integrated mysql
>
>
>
> *发件人**:* javaca...@163.com 
> *日期**:* 星期一, 2022年9月19日 17:53
> *收件人**:* user@spark.apache.org 
> *主题**:* [how to]RDD using JDBC data source in PySpark
>
> 你通常不会收到来自 javaca...@163.com 的电子邮件。了解这一点为什么很重要
> <https://aka.ms/LearnAboutSenderIdentification>
>
> Hi guys:
>
>
>
> Does have some way to let rdd can using jdbc data source in pyspark?
>
>
>
> i want to get data from mysql, but in PySpark, there is not supported
> JDBCRDD like java/scala.
>
> and i search docs from web site, no answer.
>
>
>
>
>
> So i need your guys help,  Thank you very much.
>
>
> --
>
> javaca...@163.com
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Jupyter notebook on Dataproc versus GKE

2022-09-14 Thread Bjørn Jørgensen
Mitch: Why I'm switching from Jupyter Notebooks to JupyterLab...Such a
better experience! DegreeTutors.com <https://youtu.be/djupMug3qUc>

tir. 6. sep. 2022 kl. 20:28 skrev Holden Karau :

> I’ve used Argo for K8s scheduling, for awhile it’s also what Kubeflow used
> underneath for scheduling.
>
> On Tue, Sep 6, 2022 at 10:01 AM Mich Talebzadeh 
> wrote:
>
>> Thank you all.
>>
>> Has anyone used Argo for k8s scheduler by any chance?
>>
>> On Tue, 6 Sep 2022 at 13:41, Bjørn Jørgensen 
>> wrote:
>>
>>> "*JupyterLab is the next-generation user interface for Project Jupyter
>>> offering all the familiar building blocks of the classic Jupyter Notebook
>>> (notebook, terminal, text editor, file browser, rich outputs, etc.) in a
>>> flexible and powerful user interface.*"
>>> https://github.com/jupyterlab/jupyterlab
>>>
>>> You will find them both at https://jupyter.org
>>>
>>> man. 5. sep. 2022 kl. 23:40 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
>>>> Thanks Bjorn,
>>>>
>>>> What are the differences and the functionality Jupyerlab brings in on
>>>> top of Jupyter notebook?
>>>>
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 5 Sept 2022 at 20:58, Bjørn Jørgensen 
>>>> wrote:
>>>>
>>>>> Jupyter notebook is replaced with jupyterlab :)
>>>>>
>>>>> man. 5. sep. 2022 kl. 21:10 skrev Holden Karau :
>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 5, 2022 at 9:00 AM Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for that.
>>>>>>>
>>>>>>> How do you rate the performance of Jupyter W/Spark on K8s compared
>>>>>>> to the same on  a cluster of VMs (example Dataproc).
>>>>>>>
>>>>>>> Also somehow a related question (may be naive as well). For example,
>>>>>>> Google offers a lot of standard ML libraries for example built into a 
>>>>>>> data
>>>>>>> warehouse like BigQuery. What does the Jupyter notebook offer that 
>>>>>>> others
>>>>>>> don't?
>>>>>>>
>>>>>> Jupyter notebook doesn’t offer any particular set of libraries,
>>>>>> although you can add your own to the container etc.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 5 Sept 2022 at 12:47, Holden Karau 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I’ve run Jupyter w/Spark on K8s, haven’t tried it with Dataproc
>>>>>>>> personally.
>>>>>>>>
>>>>>>>>

Re: Jupyter notebook on Dataproc versus GKE

2022-09-06 Thread Bjørn Jørgensen
"*JupyterLab is the next-generation user interface for Project Jupyter
offering all the familiar building blocks of the classic Jupyter Notebook
(notebook, terminal, text editor, file browser, rich outputs, etc.) in a
flexible and powerful user interface.*"
https://github.com/jupyterlab/jupyterlab

You will find them both at https://jupyter.org

man. 5. sep. 2022 kl. 23:40 skrev Mich Talebzadeh :

> Thanks Bjorn,
>
> What are the differences and the functionality Jupyerlab brings in on top
> of Jupyter notebook?
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 5 Sept 2022 at 20:58, Bjørn Jørgensen 
> wrote:
>
>> Jupyter notebook is replaced with jupyterlab :)
>>
>> man. 5. sep. 2022 kl. 21:10 skrev Holden Karau :
>>
>>>
>>>
>>> On Mon, Sep 5, 2022 at 9:00 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thanks for that.
>>>>
>>>> How do you rate the performance of Jupyter W/Spark on K8s compared to
>>>> the same on  a cluster of VMs (example Dataproc).
>>>>
>>>> Also somehow a related question (may be naive as well). For example,
>>>> Google offers a lot of standard ML libraries for example built into a data
>>>> warehouse like BigQuery. What does the Jupyter notebook offer that others
>>>> don't?
>>>>
>>> Jupyter notebook doesn’t offer any particular set of libraries, although
>>> you can add your own to the container etc.
>>>
>>>>
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 5 Sept 2022 at 12:47, Holden Karau 
>>>> wrote:
>>>>
>>>>> I’ve run Jupyter w/Spark on K8s, haven’t tried it with Dataproc
>>>>> personally.
>>>>>
>>>>> The Spark K8s pod scheduler is now more pluggable for Yunikorn and
>>>>> Volcano can be used with less effort.
>>>>>
>>>>> On Mon, Sep 5, 2022 at 7:44 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> Has anyone got experience of running Jupyter on dataproc versus
>>>>>> Jupyter notebook on GKE (k8).
>>>>>>
>>>>>>
>>>>>> I have not looked at this for a while but my understanding is that
>>>>>> Spark on GKE/k8 is not yet performed. This is classic Spark with
>>>>>> Python/Pyspark.
>>>>>>
>>>>>>
>>>>>> Also I would like to know the state of spark with Volcano. Has
>>>>>> progress made on that front.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>>
>>>>>> Mich
>>>>>>
>>>>>>
>>>>>>view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Jupyter notebook on Dataproc versus GKE

2022-09-05 Thread Bjørn Jørgensen
Jupyter notebook is replaced with jupyterlab :)

man. 5. sep. 2022 kl. 21:10 skrev Holden Karau :

>
>
> On Mon, Sep 5, 2022 at 9:00 AM Mich Talebzadeh 
> wrote:
>
>> Thanks for that.
>>
>> How do you rate the performance of Jupyter W/Spark on K8s compared to the
>> same on  a cluster of VMs (example Dataproc).
>>
>> Also somehow a related question (may be naive as well). For example,
>> Google offers a lot of standard ML libraries for example built into a data
>> warehouse like BigQuery. What does the Jupyter notebook offer that others
>> don't?
>>
> Jupyter notebook doesn’t offer any particular set of libraries, although
> you can add your own to the container etc.
>
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 5 Sept 2022 at 12:47, Holden Karau  wrote:
>>
>>> I’ve run Jupyter w/Spark on K8s, haven’t tried it with Dataproc
>>> personally.
>>>
>>> The Spark K8s pod scheduler is now more pluggable for Yunikorn and
>>> Volcano can be used with less effort.
>>>
>>> On Mon, Sep 5, 2022 at 7:44 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>> Has anyone got experience of running Jupyter on dataproc versus Jupyter
>>>> notebook on GKE (k8).
>>>>
>>>>
>>>> I have not looked at this for a while but my understanding is that
>>>> Spark on GKE/k8 is not yet performed. This is classic Spark with
>>>> Python/Pyspark.
>>>>
>>>>
>>>> Also I would like to know the state of spark with Volcano. Has progress
>>>> made on that front.
>>>>
>>>>
>>>> Regards,
>>>>
>>>>
>>>> Mich
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Pyspark and multiprocessing

2022-07-21 Thread Bjørn Jørgensen
Thank you.
The reason for using spark local is to test the code, and as in this case I
find the bottlenecks and fix them before I spinn up a K8S cluster.

I did test it now with
16 cores and 10 files

import time

tic = time.perf_counter()
json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json')
toc = time.perf_counter()
print(f"Func run in {toc - tic:0.4f} seconds")

Func run in 30.3695 seconds


then I stop spark and stat it with setMaster('local[1]')

and now

Func run in 30.8168 seconds


Which means that it don`t matter if I run this code on one core or on a K8S
cluster with 100 cores.

So I tested the same with

from multiprocessing.pool import ThreadPool
import multiprocessing as mp


if __name__ == "__main__":
tic = time.perf_counter()
pool = ThreadPool(mp.cpu_count())
opt =
pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json'))
toc = time.perf_counter()
print(f"Func run in {toc - tic:0.4f} seconds")

I get the same files and they are ok.
But I also get this error

TypeError Traceback (most recent call last)
Input In [33], in ()  6 tic = time.perf_counter()
  7 pool = ThreadPool(mp.cpu_count())> 8 opt =
pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
'/home/jovyan/notebooks/falk/test/test.json'))  9 toc =
time.perf_counter() 10 print(f"Func run in {toc - tic:0.4f}
seconds")


TypeError: Pool.map() missing 1 required positional argument: 'iterable'

So any hints on what to change? :)

Spark has the pandas on spark API, and that is realy great. I prefer pandas
on spark API and pyspark over pandas.

tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov <
khalidmammad...@gmail.com>:

> One quick observation is that you allocate all your local CPUs to Spark
> then execute that app with 10 Threads i.e 10 spark apps and so you will
> need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
> CPU bottleneck?
>
> Also on the side note, why you need Spark if you use that on local only?
> Sparks power can only be (mainly) observed in a cluster env.
> I have achieved great parallelism using pandas and pools on a local
> machine in the past.
>
>
> On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, 
> wrote:
>
>> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
>> They don`t have the same schema, so I have to loop over them one at a
>> time.
>>
>> This works, but is`s very slow. This process takes 5 days!
>>
>> So now I have tried to run this functions in a ThreadPool. But it don`t
>> seems to work.
>>
>>
>> *Start local spark. The system have 16 cores and 64 GB.*
>>
>> number_cores = int(multiprocessing.cpu_count())
>>
>> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
>> e.g. 4015976448
>> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>>
>>
>> def get_spark_session(app_name: str, conf: SparkConf):
>> conf.setMaster('local[{}]'.format(number_cores))
>> conf \
>>   .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>>   .set("spark.sql.repl.eagerEval.enabled", "True") \
>>   .set("spark.sql.adaptive.enabled", "True") \
>>   .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer") \
>>   .set("spark.sql.repl.eagerEval.maxNumRows", "1")
>>
>> return
>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>
>> spark = get_spark_session("Falk", SparkConf())
>>
>>
>> *Function to rename columns with \\ *
>>
>> # We take a dataframe and return a new one with required changes
>> def cleanDataFrame(df: DataFrame) -> DataFrame:
>> # Returns a new sanitized field name (this function can be anything
>> really)
>> def sanitizeFieldName(s: str) -> str:
>> return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
>> .replace("[", "_").replace("]", "_").replace(".", "_")
>>
>> # We call this on all fields to create a copy and to perform any
>> changes we might
>> # want to do to the field.
>> def sanitizeField(field: StructField) -> StructField:
>> field = copy(field)
>> field.name = sanitizeFieldName(field.name)
>> # We recursively call cleanSchema on all types
>> fi

Fwd: Pyspark and multiprocessing

2022-07-20 Thread Bjørn Jørgensen
So now I have tried to run this function in a ThreadPool. But it doesn't
seem to work.

[image: image.png]

-- Forwarded message -
Fra: Sean Owen 
Date: ons. 20. jul. 2022 kl. 22:43
Subject: Re: Pyspark and multiprocessing
To: Bjørn Jørgensen 


I don't think you ever say what doesn't work?

On Wed, Jul 20, 2022 at 3:40 PM Bjørn Jørgensen 
wrote:

> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
> They don`t have the same schema, so I have to loop over them one at a
> time.
>
> This works, but is`s very slow. This process takes 5 days!
>
> So now I have tried to run this functions in a ThreadPool. But it don`t
> seems to work.
>
>
> *Start local spark. The system have 16 cores and 64 GB.*
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
> e.g. 4015976448
> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
> conf.setMaster('local[{}]'.format(number_cores))
> conf \
>   .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>   .set("spark.sql.repl.eagerEval.enabled", "True") \
>   .set("spark.sql.adaptive.enabled", "True") \
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") \
>   .set("spark.sql.repl.eagerEval.maxNumRows", "1")
>
> return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
> spark = get_spark_session("Falk", SparkConf())
>
>
> *Function to rename columns with \\ *
>
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
> # Returns a new sanitized field name (this function can be anything
> really)
> def sanitizeFieldName(s: str) -> str:
> return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
> .replace("[", "_").replace("]", "_").replace(".", "_")
>
> # We call this on all fields to create a copy and to perform any
> changes we might
> # want to do to the field.
> def sanitizeField(field: StructField) -> StructField:
> field = copy(field)
> field.name = sanitizeFieldName(field.name)
> # We recursively call cleanSchema on all types
> field.dataType = cleanSchema(field.dataType)
> return field
>
> def cleanSchema(dataType: [DataType]) -> [DateType]:
> dataType = copy(dataType)
> # If the type is a StructType we need to recurse otherwise we can
> return since
> # we've reached the leaf node
> if isinstance(dataType, StructType):
> # We call our sanitizer for all top level fields
> dataType.fields = [sanitizeField(f) for f in dataType.fields]
> elif isinstance(dataType, ArrayType):
> dataType.elementType = cleanSchema(dataType.elementType)
> return dataType
>
> # Now since we have the new schema we can create a new DataFrame by
> using the old Frame's RDD as data and the new schema as the schema for the
> data
> return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>
>
>
> *Function to flatten out a nested dataframe.*
>
>
> from pyspark.sql.types import *
> from pyspark.sql.functions import *
>
>
> def flatten_test(df, sep="_"):
> """Returns a flattened dataframe.
> .. versionadded:: x.X.X
>
> Parameters
> --
> sep : str
> Delimiter for flatted columns. Default `_`
>
> Notes
> -
> Don`t use `.` as `sep`
> It won't work on nested data frames with more than one level.
> And you will have to use `columns.name`.
>
> Flattening Map Types will have to find every key in the column.
> This can be slow.
>
> Examples
> 
>
> data_mixed = [
> {
> "state": "Florida",
> "shortname": "FL",
> "info": {"governor": "Rick Scott"},
> "counties": [
> {"name": "Dade", "population": 12345},
> {"name": "Broward", "population": 4},
> {"name": "Palm Beach", "population":

Pyspark and multiprocessing

2022-07-20 Thread Bjørn Jørgensen
ollect()))
key_cols = list(map(lambda f: col(col_name).getItem(f)
.alias(str(col_name + sep + f)), keys))
drop_column_list = [col_name]
df = df.select([col_name for col_name in df.columns
if col_name not in drop_column_list] + key_cols)

# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
    if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType])

return df


*Function to read each file, and apply the functions and save each file as
JSON.*

def json_to_norm_with_null(dir_path, path_to_save):
path = dir_path

for filename in os.listdir(path):
if not filename.endswith('._stript_list.json'):
continue


fullname = os.path.join(path, filename)
with open(fullname) as json_file:
jsonstr = json.load(json_file)

df = spark.read.json(fullname)
df = cleanDataFrame(df)
df = flatten_test(df, sep=":")
df.write.mode('append').option('compression',
'snappy').option("ignoreNullFields", "false").json(path_to_save)


*Function to start everything of. With hopefully 10 processes.*

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=10)

tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
'/home/jovyan/notebooks/falk/F02.json'))


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How use pattern matching in spark

2022-07-14 Thread Bjørn Jørgensen
Use
from datetime import date

today = date.today()

day = today.strftime("%d/%m/%Y")
print(day)
to get today's date.
cast it to sting testday = str(day)

Compare ==
day == df_date
True or False

use loc to get row text

test_str = test.loc[1][0]

String = list in python soo

test_str[2]

'1'




ons. 13. jul. 2022 kl. 08:25 skrev Sid :

> Hi Team,
>
> I have a dataset like the below one in .dat file:
>
> 13/07/2022abc
> PWJ   PWJABC 513213217ABC GM20 05. 6/20/39
> #01000count
>
> Now I want to extract the header and tail records which I was able to do
> it. Now, from the header, I need to extract the date and match it with the
> current system date. Also, for the tail records, I need to match the number
> of actual rows i.e 1 in my case with the values mentioned in the last row.
> That is a kind of pattern matching so that I can find '1' in the last row
> and say that the actual records and the value in the tail record matches
> with each other.
>
> How can I do this? Any links would be helpful. I think regex pattern
> matching should help.
>
> Also, I will be getting 3 formats for now i.e CSV, .DAT file and .TXT
> file.
>
> So, as per me I could do validation for all these 3 file formats using
> spark.read.text().rdd and performing intended operations on Rdds. Just the
> validation part.
>
> Therefore, wanted to understand is there any better way to achieve this?
>
> Thanks,
> Sid
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Untitled2.pdf
Description: Adobe PDF document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: How reading works?

2022-07-05 Thread Bjørn Jørgensen
Ehh.. What is "*duplicate column*" ? I don't think Spark supports that.

duplicate column = duplicate rows


tir. 5. jul. 2022 kl. 22:13 skrev Bjørn Jørgensen :

> "*but I am getting the issue of the duplicate column which was present in
> the old dataset.*"
>
> So you have answered your question!
>
> spark.read.option("multiline","true").json("path").filter(
> col("edl_timestamp")>last_saved_timestamp) As you have figured out, spark
> read all the json files in "path" then filter.
>
> There are some file formats that can have filters before reading files.
> The one that I know about is Parquet. Like this link explains Spark:
> Understand the Basic of Pushed Filter and Partition Filter Using Parquet
> File
> <https://medium.com/@songkunjump/spark-understand-the-basic-of-pushed-filter-and-partition-filter-using-parquet-file-3e5789e260bd>
>
>
>
>
>
> tir. 5. jul. 2022 kl. 21:21 skrev Sid :
>
>> Hi Team,
>>
>> I still need help in understanding how reading works exactly?
>>
>> Thanks,
>> Sid
>>
>> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> Can somebody help?
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>>>
>>>> Hi,
>>>>
>>>> I already have a partitioned JSON dataset in s3 like the below:
>>>>
>>>> edl_timestamp=2022090800
>>>>
>>>> Now, the problem is, in the earlier 10 days of data collection there
>>>> was a duplicate columns issue due to which we couldn't read the data.
>>>>
>>>> Now the latest 10 days of data are proper. So, I am trying to do
>>>> something like the below:
>>>>
>>>>
>>>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>>>
>>>> but I am getting the issue of the duplicate column which was present in
>>>> the old dataset. So, I am trying to understand how the spark reads the
>>>> data. Does it full dataset and filter on the basis of the last saved
>>>> timestamp or does it filter only what is required? If the second case is
>>>> true, then it should have read the data since the latest data is correct.
>>>>
>>>> So just trying to understand. Could anyone help here?
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>>
>>>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How reading works?

2022-07-05 Thread Bjørn Jørgensen
"*but I am getting the issue of the duplicate column which was present in
the old dataset.*"

So you have answered your question!

spark.read.option("multiline","true").json("path").filter(
col("edl_timestamp")>last_saved_timestamp) As you have figured out, spark
read all the json files in "path" then filter.

There are some file formats that can have filters before reading files. The
one that I know about is Parquet. Like this link explains Spark: Understand
the Basic of Pushed Filter and Partition Filter Using Parquet File
<https://medium.com/@songkunjump/spark-understand-the-basic-of-pushed-filter-and-partition-filter-using-parquet-file-3e5789e260bd>





tir. 5. jul. 2022 kl. 21:21 skrev Sid :

> Hi Team,
>
> I still need help in understanding how reading works exactly?
>
> Thanks,
> Sid
>
> On Mon, Jun 20, 2022 at 2:23 PM Sid  wrote:
>
>> Hi Team,
>>
>> Can somebody help?
>>
>> Thanks,
>> Sid
>>
>> On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:
>>
>>> Hi,
>>>
>>> I already have a partitioned JSON dataset in s3 like the below:
>>>
>>> edl_timestamp=2022090800
>>>
>>> Now, the problem is, in the earlier 10 days of data collection there was
>>> a duplicate columns issue due to which we couldn't read the data.
>>>
>>> Now the latest 10 days of data are proper. So, I am trying to do
>>> something like the below:
>>>
>>>
>>> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>>>
>>> but I am getting the issue of the duplicate column which was present in
>>> the old dataset. So, I am trying to understand how the spark reads the
>>> data. Does it full dataset and filter on the basis of the last saved
>>> timestamp or does it filter only what is required? If the second case is
>>> true, then it should have read the data since the latest data is correct.
>>>
>>> So just trying to understand. Could anyone help here?
>>>
>>> Thanks,
>>> Sid
>>>
>>>
>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Glue is serverless? how?

2022-06-26 Thread Bjørn Jørgensen
https://en.m.wikipedia.org/wiki/Serverless_computing



søn. 26. jun. 2022, 10:26 skrev Sid :

> Hi Team,
>
> I am developing a spark job in glue and have read that glue is serverless.
> I know that using glue studio we can autoscale the workers. However, I want
> to understand how it is serverless?
>
> We specify the number of workers in the configuration for our job. Then
> what is that which is managed by Glue and we don't have to worry about the
> underlying infrastructure?
>
> Please help me to understand in layman's terms.
>
> Thanks,
> Sid
>


Re: to find Difference of locations in Spark Dataframe rows

2022-06-09 Thread Bjørn Jørgensen
If KM is kilometre then you must replace val distance = atan2(sqrt(a), sqrt
(-a + 1)) * 2 * 6371
to val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 12742

Have a look at this gnist Spherical distance calcualtion based on latitude
and longitude with Apache Spark
<https://gist.github.com/pavlov99/bd265be244f8a84e291e96c5656ceb5c>

tir. 7. jun. 2022 kl. 19:39 skrev Chetan Khatri :

> Hi Dear Spark Users,
>
> It has been many years that I have worked on Spark, Please help me. Thanks
> much
>
> I have different cities and their co-ordinates in DataFrame[Row], I want
> to find distance in KMs and then show only those records /cities which are
> 10 KMs far.
>
> I have a function created that can find the distance in KMs given two
> co-coordinates. But I don't know how to apply it to rows, like one to many
> and calculate the distance.
>
> Some code that I wrote, Sorry for the basic code.
>
> lass HouseMatching {
>   def main(args: Array[String]): Unit = {
>
> val search_property_id = args(0)
>
> // list of columns where the condition should be exact match
> val groupOneCriteria = List(
>   "occupied_by_tenant",
>   "water_index",
>   "electricity_index",
>   "elevator_index",
>   "heating_index",
>   "nb_bathtubs",
>   "nb_showers",
>   "nb_wc",
>   "nb_rooms",
>   "nb_kitchens"
> )
> // list of columns where the condition should be matching 80%
> val groupTwoCriteria = List(
>   "area",
>   "home_condition",
>   "building_age"
> )
> // list of columns where the condition should be found using Euclidean 
> distance
> val groupThreeCriteria = List(
>   "postal_code"
> )
>
> val region_or_city = "region"
>
> def haversineDistance(destination_latitude: Column, 
> destination_longitude: Column, origin_latitude: Column,
>   origin_longitude: Column): Column = {
>   val a = pow(sin(radians(destination_latitude - origin_latitude) / 2), 
> 2) +
> cos(radians(origin_latitude)) * cos(radians(destination_latitude)) *
>   pow(sin(radians(destination_longitude - origin_longitude) / 2), 2)
>   val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 6371
>   distance
> }
>
> val spark = SparkSession.builder().appName("real-estate-property-matcher")
>   .getOrCreate()
>
> val housingDataDF = 
> spark.read.csv("~/Downloads/real-estate-sample-data.csv")
>
> // searching for the property by `ref_id`
> val searchPropertyDF = housingDataDF.filter(col("ref_id") === 
> search_property_id)
>
> // Similar house in the same city (same postal code) and group one 
> condition
> val similarHouseAndSameCity = housingDataDF.join(searchPropertyDF, 
> groupThreeCriteria ++ groupOneCriteria,
>   "inner")
>
> // Similar house not in the same city but 10km range
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Complexity with the data

2022-05-26 Thread Bjørn Jørgensen
Yes, but how do you read it with spark.

tor. 26. mai 2022, 18:30 skrev Sid :

> I am not reading it through pandas. I am using Spark because when I tried
> to use pandas which comes under import pyspark.pandas, it gives me an
> error.
>
> On Thu, May 26, 2022 at 9:52 PM Bjørn Jørgensen 
> wrote:
>
>> ok, but how do you read it now?
>>
>>
>> https://github.com/apache/spark/blob/8f610d1b4ce532705c528f3c085b0289b2b17a94/python/pyspark/pandas/namespace.py#L216
>> probably have to be updated with the default options. This is so that
>> pandas API on spark will be like pandas.
>>
>> tor. 26. mai 2022 kl. 17:38 skrev Sid :
>>
>>> I was passing the wrong escape characters due to which I was facing the
>>> issue. I have updated the user's answer on my post. Now I am able to load
>>> the dataset.
>>>
>>> Thank you everyone for your time and help!
>>>
>>> Much appreciated.
>>>
>>> I have more datasets like this. I hope that would be resolved using this
>>> approach :) Fingers crossed.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, May 26, 2022 at 8:43 PM Apostolos N. Papadopoulos <
>>> papad...@csd.auth.gr> wrote:
>>>
>>>> Since you cannot create the DF directly, you may try to first create an
>>>> RDD of tuples from the file
>>>>
>>>> and then convert the RDD to a DF by using the toDF() transformation.
>>>>
>>>> Perhaps you may bypass the issue with this.
>>>>
>>>> Another thing that I have seen in the example is that you are using ""
>>>> as an escape character.
>>>>
>>>> Can you check if this may cause any issues?
>>>>
>>>> Regards,
>>>>
>>>> Apostolos
>>>>
>>>>
>>>>
>>>> On 26/5/22 16:31, Sid wrote:
>>>>
>>>> Thanks for opening the issue, Bjorn. However, could you help me to
>>>> address the problem for now with some kind of alternative?
>>>>
>>>> I am actually stuck in this since yesterday.
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, 
>>>> wrote:
>>>>
>>>>> Yes, it looks like a bug that we also have in pandas API on spark.
>>>>>
>>>>> So I have opened a JIRA
>>>>> <https://issues.apache.org/jira/browse/SPARK-39304> for this.
>>>>>
>>>>> tor. 26. mai 2022 kl. 11:09 skrev Sid :
>>>>>
>>>>>> Hello Everyone,
>>>>>>
>>>>>> I have posted a question finally with the dataset and the column
>>>>>> names.
>>>>>>
>>>>>> PFB link:
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen <
>>>>>> bjornjorgen...@gmail.com> wrote:
>>>>>>
>>>>>>> Sid, dump one of yours files.
>>>>>>>
>>>>>>>
>>>>>>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ons. 25. mai 2022, 23:04 skrev Sid :
>>>>>>>
>>>>>>>> I have 10 columns with me but in the dataset, I observed that some
>>>>>>>> records have 11 columns of data(for the additional column it is marked 
>>>>>>>> as
>>>>>>>> null). But, how do I handle this?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sid
>>>>>>>>
>>>>>>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>>>>>>
>>>>>>>>> How can I do that? Any examples or links, please. So, this works
>>>>>>>>> well with pandas I suppose. It's just that I need to convert back to 
>>>>>>>>> the
>>>>>>>>> spark data frame by providing a schema but since we are using a lower 
>>>>>>>>> spark
>>>>>>>>> version and pandas won't work in a

Re: Complexity with the data

2022-05-26 Thread Bjørn Jørgensen
ok, but how do you read it now?

https://github.com/apache/spark/blob/8f610d1b4ce532705c528f3c085b0289b2b17a94/python/pyspark/pandas/namespace.py#L216
probably have to be updated with the default options. This is so that
pandas API on spark will be like pandas.

tor. 26. mai 2022 kl. 17:38 skrev Sid :

> I was passing the wrong escape characters due to which I was facing the
> issue. I have updated the user's answer on my post. Now I am able to load
> the dataset.
>
> Thank you everyone for your time and help!
>
> Much appreciated.
>
> I have more datasets like this. I hope that would be resolved using this
> approach :) Fingers crossed.
>
> Thanks,
> Sid
>
> On Thu, May 26, 2022 at 8:43 PM Apostolos N. Papadopoulos <
> papad...@csd.auth.gr> wrote:
>
>> Since you cannot create the DF directly, you may try to first create an
>> RDD of tuples from the file
>>
>> and then convert the RDD to a DF by using the toDF() transformation.
>>
>> Perhaps you may bypass the issue with this.
>>
>> Another thing that I have seen in the example is that you are using "" as
>> an escape character.
>>
>> Can you check if this may cause any issues?
>>
>> Regards,
>>
>> Apostolos
>>
>>
>>
>> On 26/5/22 16:31, Sid wrote:
>>
>> Thanks for opening the issue, Bjorn. However, could you help me to
>> address the problem for now with some kind of alternative?
>>
>> I am actually stuck in this since yesterday.
>>
>> Thanks,
>> Sid
>>
>> On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, 
>> wrote:
>>
>>> Yes, it looks like a bug that we also have in pandas API on spark.
>>>
>>> So I have opened a JIRA
>>> <https://issues.apache.org/jira/browse/SPARK-39304> for this.
>>>
>>> tor. 26. mai 2022 kl. 11:09 skrev Sid :
>>>
>>>> Hello Everyone,
>>>>
>>>> I have posted a question finally with the dataset and the column names.
>>>>
>>>> PFB link:
>>>>
>>>>
>>>> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen <
>>>> bjornjorgen...@gmail.com> wrote:
>>>>
>>>>> Sid, dump one of yours files.
>>>>>
>>>>>
>>>>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>>>>
>>>>>
>>>>>
>>>>> ons. 25. mai 2022, 23:04 skrev Sid :
>>>>>
>>>>>> I have 10 columns with me but in the dataset, I observed that some
>>>>>> records have 11 columns of data(for the additional column it is marked as
>>>>>> null). But, how do I handle this?
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>>>>
>>>>>>> How can I do that? Any examples or links, please. So, this works
>>>>>>> well with pandas I suppose. It's just that I need to convert back to the
>>>>>>> spark data frame by providing a schema but since we are using a lower 
>>>>>>> spark
>>>>>>> version and pandas won't work in a distributed way in the lower 
>>>>>>> versions,
>>>>>>> therefore, was wondering if spark could handle this in a much better 
>>>>>>> way.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sid
>>>>>>>
>>>>>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>>>>>>
>>>>>>>> You need to normalize the CSV with a parser that can escape commas
>>>>>>>> inside of strings
>>>>>>>> Not sure if Spark has an option for this?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>>>>>>
>>>>>>>>> Thank you so much for your time.
>>>>>>>>>
>>>>>>>>> I have data like below which I tried to load by setting multiple
>>>>>>>>&

Re: Complexity with the data

2022-05-26 Thread Bjørn Jørgensen
Yes, it looks like a bug that we also have in pandas API on spark.

So I have opened a JIRA <https://issues.apache.org/jira/browse/SPARK-39304> for
this.

tor. 26. mai 2022 kl. 11:09 skrev Sid :

> Hello Everyone,
>
> I have posted a question finally with the dataset and the column names.
>
> PFB link:
>
>
> https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark
>
> Thanks,
> Sid
>
> On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen 
> wrote:
>
>> Sid, dump one of yours files.
>>
>> https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
>>
>>
>>
>> ons. 25. mai 2022, 23:04 skrev Sid :
>>
>>> I have 10 columns with me but in the dataset, I observed that some
>>> records have 11 columns of data(for the additional column it is marked as
>>> null). But, how do I handle this?
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>>>
>>>> How can I do that? Any examples or links, please. So, this works well
>>>> with pandas I suppose. It's just that I need to convert back to the spark
>>>> data frame by providing a schema but since we are using a lower spark
>>>> version and pandas won't work in a distributed way in the lower versions,
>>>> therefore, was wondering if spark could handle this in a much better way.
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray 
>>>> wrote:
>>>>
>>>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>>>
>>>>> You need to normalize the CSV with a parser that can escape commas
>>>>> inside of strings
>>>>> Not sure if Spark has an option for this?
>>>>>
>>>>>
>>>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>>>
>>>>>> Thank you so much for your time.
>>>>>>
>>>>>> I have data like below which I tried to load by setting multiple
>>>>>> options while reading the file but however, but I am not able to
>>>>>> consolidate the 9th column data within itself.
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> I tried the below code:
>>>>>>
>>>>>> df = spark.read.option("header", "true").option("multiline",
>>>>>> "true").option("inferSchema", "true").option("quote",
>>>>>>
>>>>>> '"').option(
>>>>>> "delimiter", ",").csv("path")
>>>>>>
>>>>>> What else I can do?
>>>>>>
>>>>>> Thanks,
>>>>>> Sid
>>>>>>
>>>>>>
>>>>>> On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
>>>>>> papad...@csd.auth.gr> wrote:
>>>>>>
>>>>>>> Dear Sid,
>>>>>>>
>>>>>>> can you please give us more info? Is it true that every line may
>>>>>>> have a
>>>>>>> different number of columns? Is there any rule followed by
>>>>>>>
>>>>>>> every line of the file? From the information you have sent I cannot
>>>>>>> fully understand the "schema" of your data.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Apostolos
>>>>>>>
>>>>>>>
>>>>>>> On 25/5/22 23:06, Sid wrote:
>>>>>>> > Hi Experts,
>>>>>>> >
>>>>>>> > I have below CSV data that is getting generated automatically. I
>>>>>>> can't
>>>>>>> > change the data manually.
>>>>>>> >
>>>>>>> > The data looks like below:
>>>>>>> >
>>>>>>> > 2020-12-12,abc,2000,,INR,
>>>>>>> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
>>>>>>> > 2020-12-09,fgh,,software_developer,I only manage the development
>>>>>>> part.
>>>>>>> >
>>>>>>> > Since I don't have much experience with the other domains.
>>>>>>> >
>>>>>>> > It is handled by the other people.,INR
>>>>>>> > 2020-12-12,abc,2000,,USD,
>>>>>>> >
>>>>>>> > The third record is a problem. Since the value is separated by the
>>>>>>> new
>>>>>>> > line by the user while filling up the form. So, how do I
>>>>>>> handle this?
>>>>>>> >
>>>>>>> > There are 6 columns and 4 records in total. These are the sample
>>>>>>> records.
>>>>>>> >
>>>>>>> > Should I load it as RDD and then may be using a regex should
>>>>>>> eliminate
>>>>>>> > the new lines? Or how it should be? with ". /n" ?
>>>>>>> >
>>>>>>> > Any suggestions?
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Sid
>>>>>>>
>>>>>>> --
>>>>>>> Apostolos N. Papadopoulos, Associate Professor
>>>>>>> Department of Informatics
>>>>>>> Aristotle University of Thessaloniki
>>>>>>> Thessaloniki, GREECE
>>>>>>> tel: ++0030312310991918
>>>>>>> email: papad...@csd.auth.gr
>>>>>>> twitter: @papadopoulos_ap
>>>>>>> web: http://datalab.csd.auth.gr/~apostol
>>>>>>>
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Complexity with the data

2022-05-25 Thread Bjørn Jørgensen
Sid, dump one of yours files.

https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/



ons. 25. mai 2022, 23:04 skrev Sid :

> I have 10 columns with me but in the dataset, I observed that some records
> have 11 columns of data(for the additional column it is marked as null).
> But, how do I handle this?
>
> Thanks,
> Sid
>
> On Thu, May 26, 2022 at 2:22 AM Sid  wrote:
>
>> How can I do that? Any examples or links, please. So, this works well
>> with pandas I suppose. It's just that I need to convert back to the spark
>> data frame by providing a schema but since we are using a lower spark
>> version and pandas won't work in a distributed way in the lower versions,
>> therefore, was wondering if spark could handle this in a much better way.
>>
>> Thanks,
>> Sid
>>
>> On Thu, May 26, 2022 at 2:19 AM Gavin Ray  wrote:
>>
>>> Forgot to reply-all last message, whoops. Not very good at email.
>>>
>>> You need to normalize the CSV with a parser that can escape commas
>>> inside of strings
>>> Not sure if Spark has an option for this?
>>>
>>>
>>> On Wed, May 25, 2022 at 4:37 PM Sid  wrote:
>>>
 Thank you so much for your time.

 I have data like below which I tried to load by setting multiple
 options while reading the file but however, but I am not able to
 consolidate the 9th column data within itself.

 [image: image.png]

 I tried the below code:

 df = spark.read.option("header", "true").option("multiline",
 "true").option("inferSchema", "true").option("quote",

   '"').option(
 "delimiter", ",").csv("path")

 What else I can do?

 Thanks,
 Sid


 On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos <
 papad...@csd.auth.gr> wrote:

> Dear Sid,
>
> can you please give us more info? Is it true that every line may have
> a
> different number of columns? Is there any rule followed by
>
> every line of the file? From the information you have sent I cannot
> fully understand the "schema" of your data.
>
> Regards,
>
> Apostolos
>
>
> On 25/5/22 23:06, Sid wrote:
> > Hi Experts,
> >
> > I have below CSV data that is getting generated automatically. I
> can't
> > change the data manually.
> >
> > The data looks like below:
> >
> > 2020-12-12,abc,2000,,INR,
> > 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
> > 2020-12-09,fgh,,software_developer,I only manage the development
> part.
> >
> > Since I don't have much experience with the other domains.
> >
> > It is handled by the other people.,INR
> > 2020-12-12,abc,2000,,USD,
> >
> > The third record is a problem. Since the value is separated by the
> new
> > line by the user while filling up the form. So, how do I handle this?
> >
> > There are 6 columns and 4 records in total. These are the sample
> records.
> >
> > Should I load it as RDD and then may be using a regex should
> eliminate
> > the new lines? Or how it should be? with ". /n" ?
> >
> > Any suggestions?
> >
> > Thanks,
> > Sid
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Count() action leading to errors | Pyspark

2022-05-07 Thread Bjørn Jørgensen
Try without using CTE.

SQL CTE is temporary, so you are probably working on 2 datasets.

fre. 6. mai 2022 kl. 10:32 skrev Sid :

> Hi Team,
>
> I am trying to display the counts of the DF which is created by running
> one Spark SQL query with a CTE pattern.
>
> Everything is working as expected. I was able to write the DF to Postgres
> RDS. However, when I am trying to display the counts using a simple count()
> action it leads to the below error:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o321.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 1301 in stage 35.0 failed 4 times, most recent failure: Lost task 1301.3 in
> stage 35.0 (TID 7889, 10.100.6.148, executor 1):
> java.io.FileNotFoundException: File not present on S3
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command
> in SQL or by recreating the Dataset/DataFrame involved.
>
>
> So, I tried something like the below:
>
>
> print(modifiedData.repartition(modifiedData.rdd.getNumPartitions()).count())
>
> So, there are 80 partitions being formed for this DF, and the count
> written in Table is 92,665. However, it didn't match with the count
> displayed post repartitioning which was 91,183
>
> Not sure why is this gap?
>
> Why the counts are not matching? Also what could be the possible reason
> for that simple count error?
>
> Environment:
> AWS GLUE 1.X
> 10 workers
> Spark 2.4.3
>
> Thanks,
> Sid
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark error with jupyter

2022-05-03 Thread Bjørn Jørgensen
I use jupyterlab and spark and I have not seen this before.

Jupyter has a docker stack with pyspark
<https://jupyter-docker-stacks.readthedocs.io/en/latest/using/selecting.html#jupyter-pyspark-notebook>
you
can try it.

tor. 21. apr. 2022 kl. 11:07 skrev Wassim Yaich :

> Hi Folks,
> I am working on spark in jupyter but I have a small error for each running
> .
> anyone have the same error or have a solution , please tell me .
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Dealing with large number of small files

2022-04-26 Thread Bjørn Jørgensen
What is that you have? Is it txt files or json files?
Or do you have txt files with JSON inside?



tir. 26. apr. 2022 kl. 20:41 skrev Sid :

> Thanks for your time, everyone :)
>
> Much appreciated.
>
> I solved it using jq utility since I was dealing with JSON. I have solved
> it using below script:
>
> find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt
>
>
> Thanks,
>
> Sid
>
>
> On Tue, Apr 26, 2022 at 9:37 PM Bjørn Jørgensen 
> wrote:
>
>> and the bash script seems to read txt files not json
>>
>> for f in Agent/*.txt; do cat ${f} >> merged.json;done;
>>
>>
>>
>> tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
>> gourav.sengu...@gmail.com>:
>>
>>> Hi,
>>>
>>> what is the version of spark are you using? And where is the data stored.
>>>
>>> I am not quite sure that just using a bash script will help because
>>> concatenating all the files into a single file creates a valid JSON.
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>>>
>>>> Hello,
>>>>
>>>> Can somebody help me with the below problem?
>>>>
>>>>
>>>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>>>
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Dealing with large number of small files

2022-04-26 Thread Bjørn Jørgensen
and the bash script seems to read txt files not json

for f in Agent/*.txt; do cat ${f} >> merged.json;done;



tir. 26. apr. 2022 kl. 18:03 skrev Gourav Sengupta <
gourav.sengu...@gmail.com>:

> Hi,
>
> what is the version of spark are you using? And where is the data stored.
>
> I am not quite sure that just using a bash script will help because
> concatenating all the files into a single file creates a valid JSON.
>
> Regards,
> Gourav
>
> On Tue, Apr 26, 2022 at 3:44 PM Sid  wrote:
>
>> Hello,
>>
>> Can somebody help me with the below problem?
>>
>>
>> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>>
>>
>> Thanks,
>> Sid
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Dealing with large number of small files

2022-04-26 Thread Bjørn Jørgensen
df = spark.read.json("/*.json")

use the *.json


tir. 26. apr. 2022 kl. 16:44 skrev Sid :

> Hello,
>
> Can somebody help me with the below problem?
>
>
> https://stackoverflow.com/questions/72015557/dealing-with-large-number-of-small-json-files-using-pyspark
>
>
> Thanks,
> Sid
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Vulnerabilities in htrace-core4-4.1.0-incubating.jar jar used in spark.

2022-04-26 Thread Bjørn Jørgensen
What version of spark is it that you have scanned?



tir. 26. apr. 2022 kl. 12:48 skrev HARSH TAKKAR :

> Hello,
>
> Please let me know if there is a fix available for following
> vulnerabilities in htrace jar used in spark jars folder.
>
> LIBRARY: com.fasterxml.jackson.core:jackson-databind
>
> VULNERABILITY IDs :
>
>
>
>
>
>   CVE-2020-9548
>
>
>   CVE-2020-9547
>
>
>   CVE-2020-8840
>
>
>   CVE-2020-36179
>
>
>   CVE-2020-35491
>
>
>   CVE-2020-35490
>
>
>   CVE-2020-25649
>
>
>   CVE-2020-24750
>
>
>   CVE-2020-24616
>
>
>   CVE-2020-10673
>
>
>   CVE-2019-20330
>
>
>   CVE-2019-17531
>
>
>   CVE-2019-17267
>
>
>   CVE-2019-16943
>
>
>   CVE-2019-16942
>
>
>   CVE-2019-16335
>
>
>   CVE-2019-14893
>
>
>   CVE-2019-14892
>
>
>   CVE-2019-14540
>
>
>   CVE-2019-14439
>
>
>   CVE-2019-14379
>
>
>   CVE-2019-12086
>
>
>   CVE-2018-7489
>
>
>   CVE-2018-5968
>
>
>   CVE-2018-14719
>
>
>   CVE-2018-14718
>
>
>   CVE-2018-12022
>
>
>   CVE-2018-11307
>
>
>   CVE-2017-7525
>
>
>   CVE-2017-17485
>
>
>
>
>   CVE-2017-15095
>
>
> Kind Regards
>
> Harsh Takkar
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Vulnerabilities in htrace-core4-4.1.0-incubating.jar jar used in spark.

2022-04-26 Thread Bjørn Jørgensen
Spark version 3.3 will have this fixed.
Spark github 35981 <https://github.com/apache/spark/pull/35981>



tir. 26. apr. 2022 kl. 12:48 skrev HARSH TAKKAR :

> Hello,
>
> Please let me know if there is a fix available for following
> vulnerabilities in htrace jar used in spark jars folder.
>
> LIBRARY: com.fasterxml.jackson.core:jackson-databind
>
> VULNERABILITY IDs :
>
>
>
>
>
>   CVE-2020-9548
>
>
>   CVE-2020-9547
>
>
>   CVE-2020-8840
>
>
>   CVE-2020-36179
>
>
>   CVE-2020-35491
>
>
>   CVE-2020-35490
>
>
>   CVE-2020-25649
>
>
>   CVE-2020-24750
>
>
>   CVE-2020-24616
>
>
>   CVE-2020-10673
>
>
>   CVE-2019-20330
>
>
>   CVE-2019-17531
>
>
>   CVE-2019-17267
>
>
>   CVE-2019-16943
>
>
>   CVE-2019-16942
>
>
>   CVE-2019-16335
>
>
>   CVE-2019-14893
>
>
>   CVE-2019-14892
>
>
>   CVE-2019-14540
>
>
>   CVE-2019-14439
>
>
>   CVE-2019-14379
>
>
>   CVE-2019-12086
>
>
>   CVE-2018-7489
>
>
>   CVE-2018-5968
>
>
>   CVE-2018-14719
>
>
>   CVE-2018-14718
>
>
>   CVE-2018-12022
>
>
>   CVE-2018-11307
>
>
>   CVE-2017-7525
>
>
>   CVE-2017-17485
>
>
>
>
>   CVE-2017-15095
>
>
> Kind Regards
>
> Harsh Takkar
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Bjørn Jørgensen
ed that spark has a union function(). It implements row bind. Any
>> idea how it is implemented? Is it just map reduce under the covers?
>>
>>
>>
>> My thought was
>>
>> 1.  load each col vector
>>
>> 2.  maybe I need to replace the really long row id strings with
>> integers
>>
>> 3.  convert column vectors into row vectors using piviot (Ie matrix
>> transpose.)
>>
>> 4.  union all the row vectors into a single table
>>
>> 5.  piviot the table back so I have the correct column vectors
>>
>>
>>
>> I could replace the row ids and column name with integers if needed, and
>> restore them later
>>
>>
>>
>> Maybe I would be better off using many small machines? I assume memory is
>> the limiting resource not cpu. I notice that memory usage will reach 100%.
>> I added several TB’s of local ssd. I am not convinced that spark is using
>> the local disk
>>
>>
>>
>>
>>
>> will this perform better than join?
>>
>>
>>
>> · The rows  before the final pivot will be very very wide (over 5
>> million columns)
>>
>> · There will only be 10114 rows before the pivot
>>
>>
>>
>> I assume the pivots will shuffle all the data. I assume the Colum vectors
>> are trivial. The file table pivot will be expensive however will only need
>> to be done once
>>
>>
>>
>>
>>
>>
>>
>> Comments and suggestions appreciated
>>
>>
>>
>> Andy
>>
>>
>>
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-20 Thread Bjørn Jørgensen
ata = data_struct, schema =
schema)

df_struct.printSchema()

root
|-- name: struct (nullable = true)
||-- firstname: string (nullable = true)
||-- middlename: string (nullable = true)
||-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)

df_struct_flat = flatten_test(df_struct, sep=":")

df_struct_flat.printSchema()

root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Map Types) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType])

while len(complex_fields) !=0:
col_name = list(complex_fields.keys())[0]
#print ("Processing :"+col_name+" Type :
"+str(type(complex_fields[col_name])))

# if StructType then convert all sub elements to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
for k in [n.name for n in complex_fields[col_name]]]
df = df.select("*", *expanded).drop(col_name)

# if ArrayType then add the Array Elements as Rows using the
explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df = df.withColumn(col_name, explode_outer(col_name))

# if MapType then convert all sub elements to columns.
# i.e. flatten
elif (type(complex_fields[col_name]) == MapType):
keys_df =
df.select(explode_outer(map_keys(col(col_name.distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: col(col_name).getItem(f)
.alias(str(col_name + sep + f)), keys))
drop_column_list = [col_name]
df = df.select([col_name for col_name in df.columns
if col_name not in drop_column_list] + key_cols)

# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType])

return df



Please use "answer to all" on your emails so that the user group can see
the emails. :)


ons. 20. apr. 2022 kl. 14:05 skrev Xavier Gervilla <
xavier.gervi...@datapta.com>:

> Changing this worked!
> *spark = sparknlp.start(spark32=True)*
>
> I'm adapting the rest of the code now, understanding the new schema and
> debugging and I've found this page
> <https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/3.SparkNLP_Pretrained_Models.ipynb>
> with many examples that has helped get a clearer idea of how to implement
> the pipeline.
>
> This is the schema resulting after applying the pipeline and selecting
> only the sentiment and the entities obtained:
>
> * |-- sentiment: array (nullable = true)*
>
> * ||-- element: struct (containsNull = true)*
>
> * |||-- annotatorType: string (nullable = true)*
>
> * |||-- begin: integer (nullable = false)*
>
> * |||-- end: integer (nullable = false)*
>
> * |||-- result: string (nullable = true)*
>
> * |||-- metadata: map (nullable = true)*
>
> * ||||-- key: string*
>
> * ||||-- value: string (valueContainsNull = true)*
>
> * |||-- embeddings: array (nullable = true)*
>
> * ||||-- element: float (containsNull = false)*
>
> * |-- entities: array (nullable = true)*
>
> * ||-- element: struct (containsNull = true)*
>
> * |||-- annotatorType: string (nullable = true)*
>
> * |||-- begin: integer (nullable = false)*
>
> * |||-- end: integer (nullable = false)*
>
> * |||-- result: string (nullable = true)*
>
> * |    ||-- metadata: map (nullable = true)*
>
> * ||||-- key: string*
>
> * ||||-- value: string (valueContainsNull = true)*
>
> * |||-- embeddings: array (nullable = true)*
> * ||||-- element: float (containsNull = false)*
&

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet

*change spark = sparknlp.start()*
to
spark = sparknlp.start(spark32=True)


tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen :

> Yes, there are some that have that issue.
>
> Please open a new issue at
> https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you.
>
>
>
>
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
> xavier.gervi...@datapta.com>:
>
>> Thank you for your advice, I had small knowledge of Spark NLP and I
>> thought it was only possible to use with models that required training and
>> therefore my project wasn’t the case. I'm trying now to build the project
>> again with SparkNLP but when I try to load a pretrained model from
>> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
>> occurred while calling
>> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
>> ).
>>
>> This is the new basic code to develop the project again:
>>
>>
>> *spark = sparknlp.start()*
>>
>> *pipelineName = 'analyze_sentiment'*
>>
>>
>> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
>> generates the error*
>>
>> *rawTweets = spark.readStream.format('socket').option('host',
>> 'localhost').option('port',9008).load()*
>>
>> *allTweets = rawTweets.selectExpr('CAST(value AS
>> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>>
>>
>> *sentPred = pipeline.transform(allTweets)*
>>
>> *query =
>> sentPred.writeStream.outputMode('complete').format('console').start()*
>> *query.awaitTermination()*
>>
>> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
>> is 8. I've tried with a different model but the error is still the same, so
>> what could be causing it?
>>
>> If this error is solved I think SparkNLP will be the solution I was
>> looking for to reduce memory consumption so thank you again for suggesting
>> it.
>>
>>
>>
>> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
>> escribió:
>>
>> When did SpaCy have support for Spark?
>>
>> Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They
>> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
>> they public user guides at
>> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>>
>>
>>
>>
>> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>
>> Hi Team,
>> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tw

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Bjørn Jørgensen
Yes, there are some that have that issue.

Please open a new issue at https://github.com/JohnSnowLabs/spark-nlp/issues
and they will help you.




tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
xavier.gervi...@datapta.com>:

> Thank you for your advice, I had small knowledge of Spark NLP and I
> thought it was only possible to use with models that required training and
> therefore my project wasn’t the case. I'm trying now to build the project
> again with SparkNLP but when I try to load a pretrained model from
> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
> ).
>
> This is the new basic code to develop the project again:
>
>
> *spark = sparknlp.start()*
>
> *pipelineName = 'analyze_sentiment'*
>
>
> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
> generates the error*
>
> *rawTweets = spark.readStream.format('socket').option('host',
> 'localhost').option('port',9008).load()*
>
> *allTweets = rawTweets.selectExpr('CAST(value AS
> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>
>
> *sentPred = pipeline.transform(allTweets)*
>
> *query =
> sentPred.writeStream.outputMode('complete').format('console').start()*
> *query.awaitTermination()*
>
> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
> is 8. I've tried with a different model but the error is still the same, so
> what could be causing it?
>
> If this error is solved I think SparkNLP will be the solution I was
> looking for to reduce memory consumption so thank you again for suggesting
> it.
>
>
>
> El 18 abr 2022, a las 21:07, Bjørn Jørgensen 
> escribió:
>
> When did SpaCy have support for Spark?
>
> Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>
> Hi Team,
> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Bjørn Jørgensen
print(**entities**,* *sentiments**,* *counts**)*
>>
>>
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>>
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>>
>> Thanks in advance!
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark Write BinaryType Column as continues file to S3

2022-04-09 Thread Bjørn Jørgensen
Hi Philipp.

I found this SIDELOADING – INGESTION OF LARGE POINT CLOUDS INTO THE APACHE
SPARK BIG DATA ENGINE
<https://pdfs.semanticscholar.org/efdd/4c6c50cf31c28581fcd7de5eab318c3cd174.pdf>
 paper.
Geotrellis <https://geotrellis.io/> do use pdal <https://pdal.io/> in
geotrellis-pointcloud
<https://github.com/geotrellis/geotrellis-pointcloud>, and
pdal has a java writer for las files
<https://pdal.io/stages/writers.las.html>
spark-iqmulus <https://github.com/IGNF/spark-iqmulus> is a Spark Package to
read and write PLY, LAS and XYZ lidar point clouds using Spark SQL.


fre. 8. apr. 2022 kl. 18:20 skrev Philipp Kraus <
philipp.kraus.flashp...@gmail.com>:

> Hello,
>
> > Am 08.04.2022 um 17:34 schrieb Lalwani, Jayesh :
> >
> > What format are you writing the file to? Are you planning on your own
> custom format, or are you planning to use standard formats like parquet?
>
> I’m dealing with geo-spatial data (Apache Sedona), so I have got a data
> frame with such information and would like to export it to LAS format (see
> https://en.wikipedia.org/wiki/LAS_file_format )
>
> >
> > Note that Spark can write numeric data in most standard formats. If you
> use  custom format instead, whoever consumes the data needs to parse your
> data. This adds complexity to your and your consumer's code. You will also
> need to worry about backward compatibility.
> >
> > I would suggest that you explore standard formats first before you write
> custom code. If you do have to write data in a custom format, udf is a good
> way to serialize the data into your format
>
> The numerical data must be converted into a binary representation of LAS
> format specification see
> http://www.asprs.org/wp-content/uploads/2019/07/LAS_1_4_r15.pdf section
> 2.6, Table 7
>
> Thank
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Spark Write BinaryType Column as continues file to S3

2022-04-08 Thread Bjørn Jørgensen
In the New spark 3.3 there Will be an sql function
https://github.com/apache/spark/commit/25dd4254fed71923731fd59838875c0dd1ff665a
hope this can help you.

fre. 8. apr. 2022, 17:14 skrev Philipp Kraus <
philipp.kraus.flashp...@gmail.com>:

> Hello,
>
> I have got a data frame with numerical data in Spark 3.1.1 (Java) which
> should be converted to a binary file.
> My idea is that I create a udf function that generates a byte array based
> on the numerical values, so I can apply this function on each row of the
> data frame and get than a new column with row-wise binary byte data.
> If this is done, I would like to write this column as continues byte
> stream to a file which is stored in a S3 bucket.
>
> So my question is, is the idea with the udf function a good idea and is it
> possible to write this continues byte stream directly to S3 / is there any
> built-in functionality?
> What is a good strategy to do this?
>
> Thanks for help
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7Cjoris.billen%40bigindustries.be%7C76e75be3b63c495c73a508da18680def%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637849132452199021%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=J1Kxs4FFFkDWuAxwsA62tmus6YM3sfWSr%2BLYoo8OAWg%3D=0>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fen.everybodywiki.com%2FMich_Talebzadeh=04%7C01%7Cjoris.billen%40bigindustries.be%7C76e75be3b63c495c73a508da18680def%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637849132452199021%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=WsBEJsDMomXx8e4bT%2BvMCq4vrH35wPD5jy7ngxZSDcs%3D=0>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
>> wrote:
>>
>>>
>>>1. Where does S3 come into this
>>>
>>> He is processing data for each day at a time. So to dump each day to a
>>> fast storage he can use parquet files and write it to S3.
>>>
>>> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <
>>> mich.talebza...@gmail.com>:
>>>
>>>>
>>>> Your statement below:
>>>>
>>>> I believe I have found the issue: the job writes data to hbase which is
>>>> on the same cluster.
>>>> When I keep on processing data and writing with spark to hbase ,
>>>> eventually the garbage collection can not keep up anymore for hbase, and
>>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>>> and spark, this leads to an overall increase and at some point you hit the
>>>> limit of the available memory on each worker.
>>>> I dont think the spark memory is increasing over time.
>>>>
>>>>
>>>>1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>>>>with spark using the same nodes as HDFS?
>>>>2. Is your Hbase clustered or standalone and has been created on
>>>>HDFS nodes
>>>>3. Are you writing to Hbase through phoenix or straight to HBase
>>>>4. Where does S3 come into this
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fin%2Fmich-talebzadeh-ph-d-5205b2%2F=04%7C01%7Cjoris.billen%40bigindustries.be%7C76e75be3b63c495c73a508da18680def%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637849132452199021%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=J1Kxs4FFFkDWuAxwsA62tmus6YM3sfWSr%2BLYoo8OAWg%3D=0>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fen.everybodywiki.com%2FMich_Talebzadeh=04%7C01%7Cjoris.billen%40bigindustries.be%7C76e75be3b63c495c73a508da18680def%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637849132452199021%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=WsBEJsDMomXx8e4bT%2BvMCq4vrH35wPD5jy7ngxZSDcs%3D=0>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 6 Apr 2022 at 16:41, Joris Billen <
>>>> joris.bil...@bigindustries.be> wrote:
>>>>
>>>>> HI,
>>>>> thanks for your reply.
>>>>>
>>>>>
>>>>> I believe I have found the issue: the job writes data to hbase which
>>>>> is on the same cluster.
>>>>> When I keep on processing data and writing with spark to hbase ,
>>>>> eventually the garbage coll

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
"But it will be faster to use S3 (or GCS) through some network and it will
be faster than writing to the local SSD. I don't understand the point
here."
Minio is a S3 mock, so you run minio local.

tor. 7. apr. 2022 kl. 09:27 skrev Mich Talebzadeh :

> Ok so that is your assumption. The whole thing is based on-premise on JBOD
> (including hadoop cluster which has Spark binaries on each node as I
> understand) as I understand. But it will be faster to use S3 (or GCS)
> through some network and it will be faster than writing to the local SSD. I
> don't understand the point here.
>
> Also it appears the thread owner is talking about having HBase on Hadoop
> cluster on some node eating memory.  This can be easily sorted by moving
> HBase to its own cluster, which will ease up Hadoop, Spark and HBase
> competing for resources. It is possible that the issue is with HBase setup
> as well.
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
> wrote:
>
>>
>>1. Where does S3 come into this
>>
>> He is processing data for each day at a time. So to dump each day to a
>> fast storage he can use parquet files and write it to S3.
>>
>> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>>
>>> Your statement below:
>>>
>>>
>>> I believe I have found the issue: the job writes data to hbase which is
>>> on the same cluster.
>>> When I keep on processing data and writing with spark to hbase ,
>>> eventually the garbage collection can not keep up anymore for hbase, and
>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>> and spark, this leads to an overall increase and at some point you hit the
>>> limit of the available memory on each worker.
>>> I dont think the spark memory is increasing over time.
>>>
>>>
>>>1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>>>with spark using the same nodes as HDFS?
>>>2. Is your Hbase clustered or standalone and has been created on
>>>HDFS nodes
>>>3. Are you writing to Hbase through phoenix or straight to HBase
>>>4. Where does S3 come into this
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 6 Apr 2022 at 16:41, Joris Billen 
>>> wrote:
>>>
>>>> HI,
>>>> thanks for your reply.
>>>>
>>>>
>>>> I believe I have found the issue: the job writes data to hbase which is
>>>> on the same cluster.
>>>> When I keep on processing data and writing with spark to hbase ,
>>>> eventually the garbage collection can not keep up anymore for hbase, and
>>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>>> and spark, this leads to an overall increase and at some point you hit the
>>>> limit of the available memory on each worker.
>>>> I dont think the spark memory is increasing over time.
>>>>
>>>>
>>>>
>>>> Here more details:
>>>>
>>>> **Spark: 2.4
>>>> **operation: many spark sql statements followed by writing data to a
>>>> nosql db from spark
>>>> like this:
>>>> df=read(fromhdfs)
>>>> df2=spark.sql(using df 1)
>>>> ..df10=spark.sql(using df9)
>>>> spark.sql(CACHE TABLE df10)
>>&g

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
ays,
>>> and submit always the same spark job just for other input.
>>>
>>>
>>> Thanks!
>>>
>>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>>
>>> This feels like premature optimization, and not clear it's optimizing,
>>> but maybe.
>>> Caching things that are used once is worse than not caching. It looks
>>> like a straight-line through to the write, so I doubt caching helps
>>> anything here.
>>>
>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
>>>> Hi,
>>>> as said thanks for little discussion over mail.
>>>> I understand that the action is triggered in the end at the write and
>>>> then all of a sudden everything is executed at once. But I dont really need
>>>> to trigger an action before. I am caching somewherew a df that will be
>>>> reused several times (slightly updated pseudocode below).
>>>>
>>>> Question: is it then better practice to already trigger some actions on
>>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>>> actions will not be that expensive yet, and the actions to write at the end
>>>> will require less resources, which would allow to process more days in one
>>>> go? LIke what is added in red in improvement section in the pseudo
>>>> code below?
>>>>
>>>>
>>>>
>>>> *pseudocode:*
>>>>
>>>>
>>>> *loop over all days:*
>>>> *spark submit 1 day*
>>>>
>>>>
>>>>
>>>> with spark submit (overly simplified)=
>>>>
>>>>
>>>> *  df=spark.read(hfs://somepath)*
>>>> *  …*
>>>> *   ##IMPROVEMENT START*
>>>> *   df4=spark.sql(some stuff with df3)*
>>>> *   spark.sql(CACHE TABLE df4)*
>>>> *   …*
>>>> *   df8=spark.sql(some stuff with df7)*
>>>> *   spark.sql(CACHE TABLE df8)*
>>>> *  ##IMPROVEMENT END*
>>>> *   ...*
>>>> *   df12=df11.spark.sql(complex stufff)*
>>>> *  spark.sql(CACHE TABLE df10)*
>>>> *   ...*
>>>> *  df13=spark.sql( complex stuff with df12)*
>>>> *  df13.write *
>>>> *  df14=spark.sql( some other complex stuff with df12)*
>>>> *  df14.write *
>>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>>> *  df15.write *
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> THanks!
>>>>
>>>>
>>>>
>>>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>>>>
>>>> If that is your loop unrolled, then you are not doing parts of work at
>>>> a time. That will execute all operations in one go when the write finally
>>>> happens. That's OK, but may be part of the problem. For example if you are
>>>> filtering for a subset, processing, and unioning, then that is just a
>>>> harder and slower way of applying the transformation to all data at once.
>>>>
>>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>>> joris.bil...@bigindustries.be> wrote:
>>>>
>>>>> Thanks for reply :-)
>>>>>
>>>>> I am using pyspark. Basicially my code (simplified is):
>>>>>
>>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>>> df1=spark.sql (complex statement using df)
>>>>> ...
>>>>> dfx=spark.sql(complex statement using df x-1)
>>>>> ...
>>>>> dfx15.write()
>>>>>
>>>>>
>>>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>>>> cached dataframes at the end and stopping the spark context explicitly:
>>>>> sc.stop()?
>>>>>
>>>>>
>>>>> FOr processing many years at once versus a chunk in a loop: I see that
>>>>> if I go up to certain number of days, one iteration will start to have
>>>>> tasks that fail. So I only take a limited number of days, and do this
>>>>> process several times. Isnt this normal as you are always somehow limited
>>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>>>> in theory you could process any volume, in case you wait long enough? I
>>>>> guess spark 

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Bjørn Jørgensen
and
>>>> then all of a sudden everything is executed at once. But I dont really need
>>>> to trigger an action before. I am caching somewherew a df that will be
>>>> reused several times (slightly updated pseudocode below).
>>>>
>>>> Question: is it then better practice to already trigger some actions on
>>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>>> actions will not be that expensive yet, and the actions to write at the end
>>>> will require less resources, which would allow to process more days in one
>>>> go? LIke what is added in red in improvement section in the pseudo
>>>> code below?
>>>>
>>>>
>>>>
>>>> *pseudocode:*
>>>>
>>>>
>>>> *loop over all days:*
>>>> *spark submit 1 day*
>>>>
>>>>
>>>>
>>>> with spark submit (overly simplified)=
>>>>
>>>>
>>>> *  df=spark.read(hfs://somepath)*
>>>> *  …*
>>>> *   ##IMPROVEMENT START*
>>>> *   df4=spark.sql(some stuff with df3)*
>>>> *   spark.sql(CACHE TABLE df4)*
>>>> *   …*
>>>> *   df8=spark.sql(some stuff with df7)*
>>>> *   spark.sql(CACHE TABLE df8)*
>>>> *  ##IMPROVEMENT END*
>>>> *   ...*
>>>> *   df12=df11.spark.sql(complex stufff)*
>>>> *  spark.sql(CACHE TABLE df10)*
>>>> *   ...*
>>>> *  df13=spark.sql( complex stuff with df12)*
>>>> *  df13.write *
>>>> *  df14=spark.sql( some other complex stuff with df12)*
>>>> *  df14.write *
>>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>>> *  df15.write *
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> THanks!
>>>>
>>>>
>>>>
>>>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>>>>
>>>> If that is your loop unrolled, then you are not doing parts of work at
>>>> a time. That will execute all operations in one go when the write finally
>>>> happens. That's OK, but may be part of the problem. For example if you are
>>>> filtering for a subset, processing, and unioning, then that is just a
>>>> harder and slower way of applying the transformation to all data at once.
>>>>
>>>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>>>> joris.bil...@bigindustries.be> wrote:
>>>>
>>>>> Thanks for reply :-)
>>>>>
>>>>> I am using pyspark. Basicially my code (simplified is):
>>>>>
>>>>> df=spark.read.csv(hdfs://somehdfslocation)
>>>>> df1=spark.sql (complex statement using df)
>>>>> ...
>>>>> dfx=spark.sql(complex statement using df x-1)
>>>>> ...
>>>>> dfx15.write()
>>>>>
>>>>>
>>>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>>>> cached dataframes at the end and stopping the spark context explicitly:
>>>>> sc.stop()?
>>>>>
>>>>>
>>>>> FOr processing many years at once versus a chunk in a loop: I see that
>>>>> if I go up to certain number of days, one iteration will start to have
>>>>> tasks that fail. So I only take a limited number of days, and do this
>>>>> process several times. Isnt this normal as you are always somehow limited
>>>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>>>> in theory you could process any volume, in case you wait long enough? I
>>>>> guess spark can only break down the tasks up to a certain level (based on
>>>>> the datasets' and the intermediate results’ partitions) and at some moment
>>>>> you hit the limit where your resources are not sufficient anymore to
>>>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>>>> hit a limit?
>>>>>
>>>>>
>>>>>
>>>>> Concretely  following topics would be interesting to find out more
>>>>> about (links):
>>>>> -where to see what you are still consuming after spark job ended if
>>>>> you didnt close resources
>>>>> -memory leaks for pyspark
>>>>> -good article about closing resources (you find tons of snippets on
>>>>>

Re: how to change data type for columns of dataframe

2022-04-02 Thread Bjørn Jørgensen
https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/


lør. 2. apr. 2022 kl. 04:10 skrev ayan guha :

> Please use cast. Also I would strongly recommend to go through spark doco,
> its pretty good.
>
> On Sat, 2 Apr 2022 at 12:43 pm,  wrote:
>
>> Hi
>>
>> I got a dataframe object from other application, it means this obj is
>> not generated by me.
>> How can I change the data types for some columns in this dataframe?
>>
>> For example, change the column type from Int to Float.
>>
>> Thanks.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Bjørn Jørgensen
It`s quite impossible for anyone to answer your question about what is
eating your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager
like With Context Managers and Python's with Statement
<https://realpython.com/python-with-statement/>
And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are
closing opened resources.
I use the web GUI http://spark:4040 to follow what spark is doing.




ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
joris.bil...@bigindustries.be>:

> Thanks for answer-much appreciated! This forum is very useful :-)
>
> I didnt know the sparkcontext stays alive. I guess this is eating up
> memory.  The eviction means that he knows that he should clear some of the
> old cached memory to be able to store new one. In case anyone has good
> articles about memory leaks I would be interested to read.
> I will try to add following lines at the end of my job (as I cached the
> table in spark sql):
>
>
> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
> *spark.stop()*
>
>
> Wrt looping: if I want to process 3 years of data, my modest cluster will
> never do it one go , I would expect? I have to break it down in smaller
> pieces and run that in a loop (1 day is already lots of data).
>
>
>
> Thanks!
>
>
>
>
> On 30 Mar 2022, at 17:25, Sean Owen  wrote:
>
> The Spark context does not stop when a job does. It stops when you stop
> it. There could be many ways mem can leak. Caching maybe - but it will
> evict. You should be clearing caches when no longer needed.
>
> I would guess it is something else your program holds on to in its logic.
>
> Also consider not looping; there is probably a faster way to do it in one
> go.
>
> On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
> wrote:
>
>> Hi,
>> I have a pyspark job submitted through spark-submit that does some heavy
>> processing for 1 day of data. It runs with no errors. I have to loop over
>> many days, so I run this spark job in a loop. I notice after couple
>> executions the memory is increasing on all worker nodes and eventually this
>> leads to faillures. My job does some caching, but I understand that when
>> the job ends successfully, then the sparkcontext is destroyed and the cache
>> should be cleared. However it seems that something keeps on filling the
>> memory a bit more and more after each run. THis is the memory behaviour
>> over time, which in the end will start leading to failures :
>>
>> (what we see is: green=physical memory used, green-blue=physical memory
>> cached, grey=memory capacity =straight line around 31GB )
>> This runs on a healthy spark 2.4 and was optimized already to come to a
>> stable job in terms of spark-submit resources parameters like
>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>> Any clue how to “really” clear the memory in between jobs? So basically
>> currently I can loop 10x and then need to restart my cluster so all memory
>> is cleared completely.
>>
>>
>> Thanks for any info!
>>
>> 
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: Question for so many SQL tools

2022-03-25 Thread Bjørn Jørgensen
No they are not doing the same thing.
But everyone knows SQL. SQL has been there since 1972.

Apache Drill is for NoSQL
Spark is for everything you will do with data.

All of them have their pros and cons. You just have to find what's best for
your task.


fre. 25. mar. 2022 kl. 22:32 skrev Bitfox :

> Just a question why there are so many SQL based tools existing for data
> jobs?
>
> The ones I know,
>
> Spark
> Flink
> Ignite
> Impala
> Drill
> Hive
> …
>
> They are doing the similar jobs IMO.
> Thanks
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: GraphX Support

2022-03-25 Thread Bjørn Jørgensen
Yes, MLlib <https://spark.apache.org/mllib/> is actively developed. You can
have a look at github and filter on closed and ML github and filter on
closed and ML
<https://github.com/apache/spark/pulls?q=is%3Apr+is%3Aclosed+label%3AML>



fre. 25. mar. 2022 kl. 22:15 skrev Bitfox :

> BTW , is MLlib still in active development?
>
> Thanks
>
> On Tue, Mar 22, 2022 at 07:11 Sean Owen  wrote:
>
>> GraphX is not active, though still there and does continue to build and
>> test with each Spark release. GraphFrames kind of superseded it, but is
>> also not super active FWIW.
>>
>> On Mon, Mar 21, 2022 at 6:03 PM Jacob Marquez
>>  wrote:
>>
>>> Hello!
>>>
>>>
>>>
>>> My team and I are evaluating GraphX as a possible solution. Would
>>> someone be able to speak to the support of this Spark feature? Is there
>>> active development or is GraphX in maintenance mode (e.g. updated to ensure
>>> functionality with new Spark releases)?
>>>
>>>
>>>
>>> Thanks in advance for your help!
>>>
>>>
>>>
>>> --
>>>
>>> Jacob H. Marquez
>>>
>>> He/Him
>>>
>>> Data & Applied Scientist
>>>
>>> Microsoft Cloud Data Sciences
>>>
>>>
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: [EXTERNAL] Re: GraphX Support

2022-03-25 Thread Bjørn Jørgensen
One alternative can be to use Spark and ArangoDB <https://www.arangodb.com>

Introducing the new ArangoDB Datasource for Apache Spark
<https://www.arangodb.com/2022/03/introducing-the-new-arangodb-datasource-for-apache-spark/>


ArongoDB is a open source graphs DB with a lot of good graphs utils and
documentation <https://www.arangodb.com/docs/stable/graphs.html>

tir. 22. mar. 2022 kl. 00:49 skrev Jacob Marquez
:

> Awesome, thank you!
>
>
>
> *From:* Sean Owen 
> *Sent:* Monday, March 21, 2022 4:11 PM
> *To:* Jacob Marquez 
> *Cc:* user@spark.apache.org
> *Subject:* [EXTERNAL] Re: GraphX Support
>
>
>
> You don't often get email from sro...@gmail.com. Learn why this is
> important <http://aka.ms/LearnAboutSenderIdentification>
>
> GraphX is not active, though still there and does continue to build and
> test with each Spark release. GraphFrames kind of superseded it, but is
> also not super active FWIW.
>
>
>
> On Mon, Mar 21, 2022 at 6:03 PM Jacob Marquez <
> jac...@microsoft.com.invalid> wrote:
>
> Hello!
>
>
>
> My team and I are evaluating GraphX as a possible solution. Would someone
> be able to speak to the support of this Spark feature? Is there active
> development or is GraphX in maintenance mode (e.g. updated to ensure
> functionality with new Spark releases)?
>
>
>
> Thanks in advance for your help!
>
>
>
> --
>
> Jacob H. Marquez
>
> He/Him
>
> Data & Applied Scientist
>
> Microsoft Cloud Data Sciences
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
Colums bind in r is concatinat in pandas
https://www.datasciencemadesimple.com/append-concatenate-columns-python-pandas-column-bind/


Please start a now thread for each questions.

tir. 15. mar. 2022, 22:59 skrev Andrew Davidson :

> Many many thanks!
>
>
>
> I have been looking for a pyspark data frame  column_bind() solution for
> several months. Hopefully pyspark.pandas  works. The only other solutions I
> was aware of was to use spark.dataframe.join(). This does not scale for
> obvious reason.
>
>
>
> Andy
>
>
>
>
>
> *From: *Bjørn Jørgensen 
> *Date: *Tuesday, March 15, 2022 at 2:19 PM
> *To: *Andrew Davidson 
> *Cc: *Mich Talebzadeh , "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: pivoting panda dataframe
>
>
>
> Hi Andrew. Mitch asked, and I answered transpose()
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
> .
>
>
>
> And now you are asking in the same thread about pandas API on spark and
> the transform().
>
>
>
> Apache Spark have pandas API on Spark.
>
>
>
> Which means that spark has an API call for pandas functions, and when you
> use pandas API on spark it is spark you are using then.
>
>
>
> Add this line in yours import
>
>
>
> from pyspark import pandas as ps
>
>
>
>
>
> Now you can pass yours dataframe back and forward to pandas API on spark
> by using
>
>
>
> pf01 = f01.to_pandas_on_spark()
>
>
> f01 = pf01.to_spark()
>
>
>
>
>
> Note that I have changed pd to ps here.
>
>
>
> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>
>
>
> df.transform(lambda x: x + 1)
>
>
>
> You will now see that all numbers are +1
>
>
>
> You can find more information about pandas API on spark transform
> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>
> or in yours notbook
>
> df.transform?
>
>
>
> Signature:
>
> df.transform(
>
> func: Callable[..., ForwardRef('Series')],
>
> axis: Union[int, str] = 0,
>
> *args: Any,
>
> **kwargs: Any,
>
> ) -> 'DataFrame'
>
> Docstring:
>
> Call ``func`` on self producing a Series with transformed values
>
> and that has the same length as its input.
>
>
>
> See also `Transform and apply a function
>
> <https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
>
>
>
> .. note:: this API executes the function once to infer the type which is
>
>  potentially expensive, for instance, when the dataset is created after
>
>  aggregations or sorting.
>
>
>
>  To avoid this, specify return type in ``func``, for instance, as below:
>
>
>
>  >>> def square(x) -> ps.Series[np.int32]:
>
>  ... return x ** 2
>
>
>
>  pandas-on-Spark uses return type hint and does not try to infer the type.
>
>
>
> .. note:: the series within ``func`` is actually multiple pandas series as the
>
> segments of the whole pandas-on-Spark series; therefore, the length of 
> each series
>
> is not guaranteed. As an example, an aggregation against each series
>
> does work as a global aggregation but an aggregation of each segment. See
>
> below:
>
>
>
> >>> def func(x) -> ps.Series[np.int32]:
>
> ... return x + sum(x)
>
>
>
> Parameters
>
> --
>
> func : function
>
> Function to use for transforming the data. It must work when pandas Series
>
> is passed.
>
> axis : int, default 0 or 'index'
>
> Can only be set to 0 at the moment.
>
> *args
>
> Positional arguments to pass to func.
>
> **kwargs
>
> Keyword arguments to pass to func.
>
>
>
> Returns
>
> ---
>
> DataFrame
>
> A DataFrame that must have the same length as self.
>
>
>
> Raises
>
> --
>
> Exception : If the returned DataFrame has a different length than self.
>
>
>
> See Also
>
> 
>
> DataFrame.aggregate : Only perform aggregating type operations.
>
> DataFrame.apply : Invoke function on DataFrame.
>
> Series.transform : The equivalent function for Series.
>
>
>
> Examples
>
> 
>
> >>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])
>
> >>> df
>
>A  B
>
> 0  0  1
>
> 1  1  2
&

Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
You have a pyspark dataframe and you want to convert it to pandas?

Convert it first to pandas api on spark


pf01 = f01.to_pandas_on_spark()


Then convert it to pandas


pf01 = f01.to_pandas()

Or?

tir. 15. mar. 2022, 22:56 skrev Mich Talebzadeh :

> Thanks everyone.
>
> I want to do the following in pandas and numpy without using spark.
>
> This is what I do in spark to generate some random data using class
> UsedFunctions (not important).
>
> class UsedFunctions:
>   def randomString(self,length):
> letters = string.ascii_letters
> result_str = ''.join(random.choice(letters) for i in range(length))
> return result_str
>   def clustered(self,x,numRows):
> return math.floor(x -1)/numRows
>   def scattered(self,x,numRows):
> return abs((x -1 % numRows))* 1.0
>   def randomised(self,seed,numRows):
> random.seed(seed)
> return abs(random.randint(0, numRows) % numRows) * 1.0
>   def padString(self,x,chars,length):
> n = int(math.log10(x) + 1)
> result_str = ''.join(random.choice(chars) for i in range(length-n)) +
> str(x)
> return result_str
>   def padSingleChar(self,chars,length):
> result_str = ''.join(chars for i in range(length))
> return result_str
>   def println(self,lst):
> for ll in lst:
>   print(ll[0])
>
>
> usedFunctions = UsedFunctions()
>
> start = 1
> end = start + 9
> print ("starting at ID = ",start, ",ending on = ",end)
> Range = range(start, end)
> rdd = sc.parallelize(Range). \
>  map(lambda x: (x, usedFunctions.clustered(x,numRows), \
>usedFunctions.scattered(x,numRows), \
>usedFunctions.randomised(x,numRows), \
>usedFunctions.randomString(50), \
>usedFunctions.padString(x," ",50), \
>usedFunctions.padSingleChar("x",4000)))
> df = rdd.toDF()
>
> OK how can I create a panda DataFrame df without using Spark?
>
> Thanks
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 15 Mar 2022 at 21:19, Bjørn Jørgensen 
> wrote:
>
>> Hi Andrew. Mitch asked, and I answered transpose()
>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
>> .
>>
>> And now you are asking in the same thread about pandas API on spark and
>> the transform().
>>
>> Apache Spark have pandas API on Spark.
>>
>> Which means that spark has an API call for pandas functions, and when you
>> use pandas API on spark it is spark you are using then.
>>
>> Add this line in yours import
>>
>> from pyspark import pandas as ps
>>
>>
>> Now you can pass yours dataframe back and forward to pandas API on spark
>> by using
>>
>> pf01 = f01.to_pandas_on_spark()
>>
>>
>> f01 = pf01.to_spark()
>>
>>
>> Note that I have changed pd to ps here.
>>
>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})
>>
>> df.transform(lambda x: x + 1)
>>
>> You will now see that all numbers are +1
>>
>> You can find more information about pandas API on spark transform
>> https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
>> or in yours notbook
>> df.transform?
>>
>> Signature:
>> df.transform(
>> func: Callable[..., ForwardRef('Series')],
>> axis: Union[int, str] = 0,
>> *args: Any,
>> **kwargs: Any,) -> 'DataFrame'Docstring:
>> Call ``func`` on self producing a Series with transformed values
>> and that has the same length as its input.
>>
>> See also `Transform and apply a function
>> <https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
>>
>> .. note:: this API executes the function once to infer the type which is
>>  potentially expensive, for instance, when the dataset is created after
>>  aggregations or sorting.
>>
>>  To avoid this, 

Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
Hi Andrew. Mitch asked, and I answered transpose()
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
.

And now you are asking in the same thread about pandas API on spark and the
transform().

Apache Spark have pandas API on Spark.

Which means that spark has an API call for pandas functions, and when you
use pandas API on spark it is spark you are using then.

Add this line in yours import

from pyspark import pandas as ps


Now you can pass yours dataframe back and forward to pandas API on spark by
using

pf01 = f01.to_pandas_on_spark()


f01 = pf01.to_spark()


Note that I have changed pd to ps here.

df = ps.DataFrame({'A': range(3), 'B': range(1, 4)})

df.transform(lambda x: x + 1)

You will now see that all numbers are +1

You can find more information about pandas API on spark transform
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html?highlight=pyspark%20pandas%20dataframe%20transform#pyspark.pandas.DataFrame.transform
or in yours notbook
df.transform?

Signature:
df.transform(
func: Callable[..., ForwardRef('Series')],
axis: Union[int, str] = 0,
*args: Any,
**kwargs: Any,) -> 'DataFrame'Docstring:
Call ``func`` on self producing a Series with transformed values
and that has the same length as its input.

See also `Transform and apply a function
<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.

.. note:: this API executes the function once to infer the type which is
 potentially expensive, for instance, when the dataset is created after
 aggregations or sorting.

 To avoid this, specify return type in ``func``, for instance, as below:

 >>> def square(x) -> ps.Series[np.int32]:
 ... return x ** 2

 pandas-on-Spark uses return type hint and does not try to infer the type.

.. note:: the series within ``func`` is actually multiple pandas series as the
segments of the whole pandas-on-Spark series; therefore, the
length of each series
is not guaranteed. As an example, an aggregation against each series
does work as a global aggregation but an aggregation of each segment. See
below:

>>> def func(x) -> ps.Series[np.int32]:
... return x + sum(x)

Parameters
--
func : function
Function to use for transforming the data. It must work when pandas Series
is passed.
axis : int, default 0 or 'index'
Can only be set to 0 at the moment.
*args
Positional arguments to pass to func.
**kwargs
Keyword arguments to pass to func.

Returns
---
DataFrame
A DataFrame that must have the same length as self.

Raises
--
Exception : If the returned DataFrame has a different length than self.

See Also

DataFrame.aggregate : Only perform aggregating type operations.
DataFrame.apply : Invoke function on DataFrame.
Series.transform : The equivalent function for Series.

Examples

>>> df = ps.DataFrame({'A': range(3), 'B': range(1, 4)}, columns=['A', 'B'])
>>> df
   A  B
0  0  1
1  1  2
2  2  3

>>> def square(x) -> ps.Series[np.int32]:
... return x ** 2
>>> df.transform(square)
   A  B
0  0  1
1  1  4
2  4  9

You can omit the type hint and let pandas-on-Spark infer its type.

>>> df.transform(lambda x: x ** 2)
   A  B
0  0  1
1  1  4
2  4  9

For multi-index columns:

>>> df.columns = [('X', 'A'), ('X', 'B')]
>>> df.transform(square)  # doctest: +NORMALIZE_WHITESPACE
   X
   A  B
0  0  1
1  1  4
2  4  9

>>> (df * -1).transform(abs)  # doctest: +NORMALIZE_WHITESPACE
   X
   A  B
0  0  1
1  1  2
2  2  3

You can also specify extra arguments.

>>> def calculation(x, y, z) -> ps.Series[int]:
... return x ** y + z
>>> df.transform(calculation, y=10, z=20)  # doctest: +NORMALIZE_WHITESPACE
  X
  A  B
020 21
121   1044
2  1044  59069File:
/opt/spark/python/pyspark/pandas/frame.pyType:  method





tir. 15. mar. 2022 kl. 19:33 skrev Andrew Davidson :

> Hi Bjorn
>
>
>
> I have been looking for spark transform for a while. Can you send me a
> link to the pyspark function?
>
>
>
> I assume pandas transform is not really an option. I think it will try to
> pull the entire dataframe into the drivers memory.
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> p.s. My real problem is that spark does not allow you to bind columns. You
> can use union() to bind rows. I could get the equivalent of cbind() using
> union().transform()
>
>
>
> *From: *Bjørn Jørgensen 
> *Date: *Tuesday, March 15, 2022 at 10:37 AM
> *To: *Mich Talebzadeh 
> *Cc: *"user @spark" 
> *Subject: *Re: pivoting panda dataframe
>
>
>
>
> https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.tr

Re: pivoting panda dataframe

2022-03-15 Thread Bjørn Jørgensen
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.transpose.html we
have that transpose in pandas api for spark to.

You also have stack() and multilevel
https://pandas.pydata.org/pandas-docs/stable/user_guide/reshaping.html



tir. 15. mar. 2022 kl. 17:50 skrev Mich Talebzadeh <
mich.talebza...@gmail.com>:

>
> hi,
>
>
> Is it possible to pivot a panda dataframe by making the row column
> heading?
>
>
> thanks
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


  1   2   >