Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-30 Thread Subhasis Mukherjee
Regarding making spark writer fast part, If you are (or can be) on Databricks, 
check this out. It is just out of the oven at Databricks.

https://www.databricks.com/blog/announcing-general-availability-liquid-clustering?utm_source=bambu_medium=social_campaign=advocacy=6087618




From: Gera Shegalov 
Sent: Wednesday, May 29, 2024 7:57:56 am
To: Prem Sahoo 
Cc: eab...@163.com ; Vibhor Gupta ; 
user @spark 
Subject: Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

I agree with the previous answers that (if requirements allow it) it is much 
easier to just orchestrate a copy either in the same app or sync externally.

A long time ago and not for a Spark app we were solving a similar usecase via 
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
 . It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
I am looking for writer/comitter optimization which can make the spark write 
faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com<mailto:eab...@163.com> 
mailto:eab...@163.com>> wrote:
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.


eabour

From: Prem Sahoo<mailto:prem.re...@gmail.com>
Date: 2024-05-22 00:38
To: Vibhor Gupta<mailto:vibhor.gu...@walmart.com>; 
user<mailto:user@spark.apache.org>
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo 
mailto:prem.re...@gmail.com>> wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta 
mailto:vibhor.gu...@walmart.com>> wrote:


Hi Prem,

You can try to write to HDFS then read from HDFS and write to MinIO.

This will prevent duplicate transformation.

You can also try persisting the dataframe using the DISK_ONLY level.

Regards,
Vibhor
From: Prem Sahoo mailto:prem.re...@gmail.com>>
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list mailto:d...@spark.apache.org>>
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time .

Scenario:-

Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?

Read the parquet file and do a few transformations and write to HDFS and MinIO.

here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?

Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?

I don't want to do repartition and write as repartition will have overhead of 
shuffling .

Please provide some inputs.





Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-28 Thread Gera Shegalov
I agree with the previous answers that (if requirements allow it) it is
much easier to just orchestrate a copy either in the same app or sync
externally.

A long time ago and not for a Spark app we were solving a similar usecase
via
https://hadoop.apache.org/docs/r3.2.3/hadoop-project-dist/hadoop-hdfs/ViewFs.html#Multi-Filesystem_I.2F0_with_Nfly_Mount_Points
. It may work with Spark because it is underneath the FileSystem API ...



On Tue, May 21, 2024 at 10:03 PM Prem Sahoo  wrote:

> I am looking for writer/comitter optimization which can make the spark
> write faster.
>
> On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:
>
>> Hi,
>> I think you should write to HDFS then copy file (parquet or orc)
>> from HDFS to MinIO.
>>
>> --
>> eabour
>>
>>
>> *From:* Prem Sahoo 
>> *Date:* 2024-05-22 00:38
>> *To:* Vibhor Gupta ; user
>> 
>> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>>
>>
>> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>>
>>> Hello Vibhor,
>>> Thanks for the suggestion .
>>> I am looking for some other alternatives where I can use the same
>>> dataframe can be written to two destinations without re execution and cache
>>> or persist .
>>>
>>> Can some one help me in scenario 2 ?
>>> How to make spark write to MinIO faster ?
>>> Sent from my iPhone
>>>
>>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>>> wrote:
>>>
>>> 
>>>
>>> Hi Prem,
>>>
>>>
>>>
>>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>>
>>>
>>>
>>> This will prevent duplicate transformation.
>>>
>>>
>>>
>>> You can also try persisting the dataframe using the DISK_ONLY level.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Vibhor
>>>
>>> *From: *Prem Sahoo 
>>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>>> *To: *Spark dev list 
>>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>>
>>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>>
>>> Hello Team,
>>>
>>> I am planning to write to two datasource at the same time .
>>>
>>>
>>>
>>> Scenario:-
>>>
>>>
>>>
>>> Writing the same dataframe to HDFS and MinIO without re-executing the
>>> transformations and no cache(). Then how can we make it faster ?
>>>
>>>
>>>
>>> Read the parquet file and do a few transformations and write to HDFS and
>>> MinIO.
>>>
>>>
>>>
>>> here in both write spark needs execute the transformation again. Do we
>>> know how we can avoid re-execution of transformation  without
>>> cache()/persist ?
>>>
>>>
>>>
>>> Scenario2 :-
>>>
>>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>>
>>> Do we have any way to make writing this faster ?
>>>
>>>
>>>
>>> I don't want to do repartition and write as repartition will have
>>> overhead of shuffling .
>>>
>>>
>>>
>>> Please provide some inputs.
>>>
>>>
>>>
>>>
>>>
>>>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
I am looking for writer/comitter optimization which can make the spark
write faster.

On Tue, May 21, 2024 at 9:15 PM eab...@163.com  wrote:

> Hi,
> I think you should write to HDFS then copy file (parquet or orc) from
> HDFS to MinIO.
>
> --
> eabour
>
>
> *From:* Prem Sahoo 
> *Date:* 2024-05-22 00:38
> *To:* Vibhor Gupta ; user
> 
> *Subject:* Re: EXT: Dual Write to HDFS and MinIO in faster way
>
>
> On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
>
>> Hello Vibhor,
>> Thanks for the suggestion .
>> I am looking for some other alternatives where I can use the same
>> dataframe can be written to two destinations without re execution and cache
>> or persist .
>>
>> Can some one help me in scenario 2 ?
>> How to make spark write to MinIO faster ?
>> Sent from my iPhone
>>
>> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
>> wrote:
>>
>> 
>>
>> Hi Prem,
>>
>>
>>
>> You can try to write to HDFS then read from HDFS and write to MinIO.
>>
>>
>>
>> This will prevent duplicate transformation.
>>
>>
>>
>> You can also try persisting the dataframe using the DISK_ONLY level.
>>
>>
>>
>> Regards,
>>
>> Vibhor
>>
>> *From: *Prem Sahoo 
>> *Date: *Tuesday, 21 May 2024 at 8:16 AM
>> *To: *Spark dev list 
>> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>>
>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>
>> Hello Team,
>>
>> I am planning to write to two datasource at the same time .
>>
>>
>>
>> Scenario:-
>>
>>
>>
>> Writing the same dataframe to HDFS and MinIO without re-executing the
>> transformations and no cache(). Then how can we make it faster ?
>>
>>
>>
>> Read the parquet file and do a few transformations and write to HDFS and
>> MinIO.
>>
>>
>>
>> here in both write spark needs execute the transformation again. Do we
>> know how we can avoid re-execution of transformation  without
>> cache()/persist ?
>>
>>
>>
>> Scenario2 :-
>>
>> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>>
>> Do we have any way to make writing this faster ?
>>
>>
>>
>> I don't want to do repartition and write as repartition will have
>> overhead of shuffling .
>>
>>
>>
>> Please provide some inputs.
>>
>>
>>
>>
>>
>>


Re: Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread eab...@163.com
Hi,
I think you should write to HDFS then copy file (parquet or orc) from HDFS 
to MinIO.



eabour
 
From: Prem Sahoo
Date: 2024-05-22 00:38
To: Vibhor Gupta; user
Subject: Re: EXT: Dual Write to HDFS and MinIO in faster way


On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:
Hello Vibhor,
Thanks for the suggestion .
I am looking for some other alternatives where I can use the same dataframe can 
be written to two destinations without re execution and cache or persist .

Can some one help me in scenario 2 ?
How to make spark write to MinIO faster ?
Sent from my iPhone

On May 21, 2024, at 1:18 AM, Vibhor Gupta  wrote:

 
Hi Prem,
 
You can try to write to HDFS then read from HDFS and write to MinIO.
 
This will prevent duplicate transformation.
 
You can also try persisting the dataframe using the DISK_ONLY level.
 
Regards,
Vibhor
From: Prem Sahoo 
Date: Tuesday, 21 May 2024 at 8:16 AM
To: Spark dev list 
Subject: EXT: Dual Write to HDFS and MinIO in faster way
EXTERNAL: Report suspicious emails to Email Abuse.
Hello Team,
I am planning to write to two datasource at the same time . 
 
Scenario:-
 
Writing the same dataframe to HDFS and MinIO without re-executing the 
transformations and no cache(). Then how can we make it faster ?
 
Read the parquet file and do a few transformations and write to HDFS and MinIO.
 
here in both write spark needs execute the transformation again. Do we know how 
we can avoid re-execution of transformation  without cache()/persist ?
 
Scenario2 :-
I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
Do we have any way to make writing this faster ?
 
I don't want to do repartition and write as repartition will have overhead of 
shuffling .
 
Please provide some inputs. 
 
 


Re: EXT: Dual Write to HDFS and MinIO in faster way

2024-05-21 Thread Prem Sahoo
On Tue, May 21, 2024 at 6:58 AM Prem Sahoo  wrote:

> Hello Vibhor,
> Thanks for the suggestion .
> I am looking for some other alternatives where I can use the same
> dataframe can be written to two destinations without re execution and cache
> or persist .
>
> Can some one help me in scenario 2 ?
> How to make spark write to MinIO faster ?
> Sent from my iPhone
>
> On May 21, 2024, at 1:18 AM, Vibhor Gupta 
> wrote:
>
> 
>
> Hi Prem,
>
>
>
> You can try to write to HDFS then read from HDFS and write to MinIO.
>
>
>
> This will prevent duplicate transformation.
>
>
>
> You can also try persisting the dataframe using the DISK_ONLY level.
>
>
>
> Regards,
>
> Vibhor
>
> *From: *Prem Sahoo 
> *Date: *Tuesday, 21 May 2024 at 8:16 AM
> *To: *Spark dev list 
> *Subject: *EXT: Dual Write to HDFS and MinIO in faster way
>
> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>
> Hello Team,
>
> I am planning to write to two datasource at the same time .
>
>
>
> Scenario:-
>
>
>
> Writing the same dataframe to HDFS and MinIO without re-executing the
> transformations and no cache(). Then how can we make it faster ?
>
>
>
> Read the parquet file and do a few transformations and write to HDFS and
> MinIO.
>
>
>
> here in both write spark needs execute the transformation again. Do we
> know how we can avoid re-execution of transformation  without
> cache()/persist ?
>
>
>
> Scenario2 :-
>
> I am writing 3.2G data to HDFS and MinIO which takes ~6mins.
>
> Do we have any way to make writing this faster ?
>
>
>
> I don't want to do repartition and write as repartition will have overhead
> of shuffling .
>
>
>
> Please provide some inputs.
>
>
>
>
>
>


Re: [EXTERNAL] Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-11 Thread Eugene Miretsky
Hey Mich,

Thanks for the detailed response. I get most of these options.

However, what we are trying to do is avoid having to upload the source
configs and pyspark.zip files to the cluster every time we execute the job
using spark-submit. Here is the code that does it:
https://github.com/apache/spark/blob/bacdb3b5fec9783f4604276480eb2a0f5702/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L813

Wondering if there is a way to skip uploading the configs. Uploading the
pyspark.zip file can be skipped by setting
PYSPARK_ARCHIVES_PATH=local://

On Mon, Dec 11, 2023 at 5:15 AM Mich Talebzadeh 
wrote:

> Hi Eugene,
>
> With regard to your points
>
> What are the PYTHONPATH and SPARK_HOME env variables in your script?
>
> OK let us look at a typical of my Spark project structure
>
> - project_root
>   |-- README.md
>   |-- __init__.py
>   |-- conf
>   |   |-- (configuration files for Spark)
>   |-- deployment
>   |   |-- deployment.yaml
>   |-- design
>   |   |-- (design-related files or documentation)
>   |-- othermisc
>   |   |-- (other miscellaneous files)
>   |-- sparkutils
>   |   |-- (utility modules or scripts specific to Spark)
>   |-- src
>   |-- (main source code for your Spark application)
>
> If you want Spark to recognize modules from the sparkutils directory or
> any other directories within your project, you can include those
> directories in the PYTHONPATH.
>
> For example, if you want to include the sparkutils directory:
>
> export PYTHONPATH=/path/to/project_root/sparkutils:$PYTHONPATH
> to recap, the ${PYTHONPATH} variable is primarily used to specify
> additional directories where Python should look for modules and packages.
> In the context of Spark, it is typically used to include directories
> containing custom Python code or modules that your Spark application
> depends on.
>
> With regard to
>
> The --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" configuration
> option in Spark is used when submitting a Spark application to run on YARN
>
>-
>
>--conf: This is used to specify Spark configuration properties when
>submitting a Spark application.
>-
>
>spark.yarn.appMasterEnv.SPARK_HOME: This is a Spark configuration
>property that defines the value of the SPARK_HOME environment variable
>for the Spark application's Application Master (the process responsible for
>managing the execution of tasks on a YARN cluster).
>-
>
>$SPARK_HOME: This holds the path to the Spark installation directory.
>
> This configuration is setting the SPARK_HOME environment variable for the
> Spark Application Master when the application is running on YARN. This is
> important because the Spark Application Master needs to know the location
> of the Spark installation directory (SPARK_HOME) to configure and manage
> the Spark application's execution on the YARN cluster. HTH
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 11 Dec 2023 at 01:43, Eugene Miretsky  wrote:
>
>> Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't
>> understand a few things
>>
>> 1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty,
>> pyspark.zip is uploaded from the local SPARK_HOME. If it is set to
>> "local://" the upload is skipped. I would expect the latter to be the
>> default. What's the use case for uploading the local pyspark.zip every
>> time?
>> 2) It seems like the localConfigs are meant to be copied every time (code)
>> what's the use case for that? Why not just use the cluster config?
>>
>>
>>
>> On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky  wrote:
>>
>>> Thanks Mich,
>>>
>>> Tried this and still getting
>>> INF Client: "Uploading resource
>>> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
>>> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
>>> __spark_conf__.zip). It is working now because I enabled direct
>>> access to HDFS to allow copying the files. But ideall

Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-11 Thread Mich Talebzadeh
Hi Eugene,

With regard to your points

What are the PYTHONPATH and SPARK_HOME env variables in your script?

OK let us look at a typical of my Spark project structure

- project_root
  |-- README.md
  |-- __init__.py
  |-- conf
  |   |-- (configuration files for Spark)
  |-- deployment
  |   |-- deployment.yaml
  |-- design
  |   |-- (design-related files or documentation)
  |-- othermisc
  |   |-- (other miscellaneous files)
  |-- sparkutils
  |   |-- (utility modules or scripts specific to Spark)
  |-- src
  |-- (main source code for your Spark application)

If you want Spark to recognize modules from the sparkutils directory or any
other directories within your project, you can include those directories in
the PYTHONPATH.

For example, if you want to include the sparkutils directory:

export PYTHONPATH=/path/to/project_root/sparkutils:$PYTHONPATH
to recap, the ${PYTHONPATH} variable is primarily used to specify
additional directories where Python should look for modules and packages.
In the context of Spark, it is typically used to include directories
containing custom Python code or modules that your Spark application
depends on.

With regard to

The --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" configuration
option in Spark is used when submitting a Spark application to run on YARN

   -

   --conf: This is used to specify Spark configuration properties when
   submitting a Spark application.
   -

   spark.yarn.appMasterEnv.SPARK_HOME: This is a Spark configuration
   property that defines the value of the SPARK_HOME environment variable
   for the Spark application's Application Master (the process responsible for
   managing the execution of tasks on a YARN cluster).
   -

   $SPARK_HOME: This holds the path to the Spark installation directory.

This configuration is setting the SPARK_HOME environment variable for the
Spark Application Master when the application is running on YARN. This is
important because the Spark Application Master needs to know the location
of the Spark installation directory (SPARK_HOME) to configure and manage
the Spark application's execution on the YARN cluster. HTH
Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 11 Dec 2023 at 01:43, Eugene Miretsky  wrote:

> Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't
> understand a few things
>
> 1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty,
> pyspark.zip is uploaded from the local SPARK_HOME. If it is set to
> "local://" the upload is skipped. I would expect the latter to be the
> default. What's the use case for uploading the local pyspark.zip every
> time?
> 2) It seems like the localConfigs are meant to be copied every time (code)
> what's the use case for that? Why not just use the cluster config?
>
>
>
> On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky  wrote:
>
>> Thanks Mich,
>>
>> Tried this and still getting
>> INF Client: "Uploading resource
>> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
>> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
>> __spark_conf__.zip). It is working now because I enabled direct
>> access to HDFS to allow copying the files. But ideally I would like to not
>> have to copy any files directly to HDFS.
>>
>> 1) We would expect pyspark as well as the relevant configs to already be
>> available on the cluster - why are they being copied over? (we can always
>> provide the extra libraries needed using py-files the way you did)
>> 2) If we wanted users to be able to use custom pyspark, we would rather
>> just copy the file HDFS/GCS in other ways, and let users reference it in
>> their job
>> 3) What are the PYTHONPATH and SPARK_HOME env variables in your script?
>> Are they local paths, or paths on the spark cluster?
>>
>> On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How are you submitting your spark job from your client?
>>>
>>> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>>>
>>> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
>>> assume you want your
>>>
>>> spark-submit

Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-10 Thread Eugene Miretsky
Setting PYSPARK_ARCHIVES_PATH to hfds:// did the tricky. But don't
understand a few things

1) The default behaviour is if PYSPARK_ARCHIVES_PATH is empty, pyspark.zip
is uploaded from the local SPARK_HOME. If it is set to "local://" the
upload is skipped. I would expect the latter to be the default. What's the
use case for uploading the local pyspark.zip every time?
2) It seems like the localConfigs are meant to be copied every time (code
<http://copyFileToRemote(destDir, localConfArchive, replication,
symlinkCache, force = true,>) what's the use case for that? Why not just
use the cluster config?



On Sun, Dec 10, 2023 at 1:15 PM Eugene Miretsky  wrote:

> Thanks Mich,
>
> Tried this and still getting
> INF Client: "Uploading resource
> file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
> hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
> __spark_conf__.zip). It is working now because I enabled direct
> access to HDFS to allow copying the files. But ideally I would like to not
> have to copy any files directly to HDFS.
>
> 1) We would expect pyspark as well as the relevant configs to already be
> available on the cluster - why are they being copied over? (we can always
> provide the extra libraries needed using py-files the way you did)
> 2) If we wanted users to be able to use custom pyspark, we would rather
> just copy the file HDFS/GCS in other ways, and let users reference it in
> their job
> 3) What are the PYTHONPATH and SPARK_HOME env variables in your script?
> Are they local paths, or paths on the spark cluster?
>
> On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> How are you submitting your spark job from your client?
>>
>> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>>
>> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
>> assume you want your
>>
>> spark-submit --verbose \
>>--deploy-mode cluster \
>>--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
>>--conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
>>--conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
>>--py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
>>--conf "spark.driver.memory"=4G \
>>--conf "spark.executor.memory"=4G \
>>--conf "spark.num.executors"=4 \
>>--conf "spark.executor.cores"=2 \
>>$CODE_DIRECTORY_CLOUD/${APPLICATION}
>>
>> in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage
>>
>> CODE_DIRECTORY="/home/hduser/dba/bin/python/"
>> CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
>> cd $CODE_DIRECTORY
>> [ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
>> echo `date` ", ===> creating source zip directory from  ${source_code}"
>> # zip needs to be done at root directory of code
>> zip -rq ${source_code}.zip ${source_code}
>> gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
>> gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
>> $CODE_DIRECTORY_CLOUD
>>
>> So in summary I create a zip  file of my project and copy it across to
>> the cloud storage and then put the application (py file) there as well and
>> use them in spark-submit
>>
>> I trust this answers your question.
>>
>> HTH
>>
>>
>>
>> Mich Talebzadeh,
>> Technologist, Solutions Architect & Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
>> wrote:
>>
>>> Hey All,
>>>
>>> We are running Pyspark spark-submit from a client outside the cluster.
>>> The client has network connectivity only to the Yarn Master, not the HDFS
>>> Datanodes. How can we submit the jobs? The idea would be to preload all the
>>> dependencies (job code, libraries, etc) to HDF

Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-12-10 Thread Eugene Miretsky
Thanks Mich,

Tried this and still getting
INF Client: "Uploading resource
file:/opt/spark/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip ->
hdfs:/". It is also doing it for (py4j.-0.10.9.7-src.zip and
__spark_conf__.zip). It is working now because I enabled direct
access to HDFS to allow copying the files. But ideally I would like to not
have to copy any files directly to HDFS.

1) We would expect pyspark as well as the relevant configs to already be
available on the cluster - why are they being copied over? (we can always
provide the extra libraries needed using py-files the way you did)
2) If we wanted users to be able to use custom pyspark, we would rather
just copy the file HDFS/GCS in other ways, and let users reference it in
their job
3) What are the PYTHONPATH and SPARK_HOME env variables in your script? Are
they local paths, or paths on the spark cluster?

On Fri, Nov 17, 2023 at 8:57 AM Mich Talebzadeh 
wrote:

> Hi,
>
> How are you submitting your spark job from your client?
>
> Your files can either be on HDFS or HCFS such as gs, s3 etc.
>
> With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
> assume you want your
>
> spark-submit --verbose \
>--deploy-mode cluster \
>--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
>--conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
>--conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
>--py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
>--conf "spark.driver.memory"=4G \
>--conf "spark.executor.memory"=4G \
>--conf "spark.num.executors"=4 \
>--conf "spark.executor.cores"=2 \
>$CODE_DIRECTORY_CLOUD/${APPLICATION}
>
> in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage
>
> CODE_DIRECTORY="/home/hduser/dba/bin/python/"
> CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
> cd $CODE_DIRECTORY
> [ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
> echo `date` ", ===> creating source zip directory from  ${source_code}"
> # zip needs to be done at root directory of code
> zip -rq ${source_code}.zip ${source_code}
> gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
> gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
> $CODE_DIRECTORY_CLOUD
>
> So in summary I create a zip  file of my project and copy it across to the
> cloud storage and then put the application (py file) there as well and use
> them in spark-submit
>
> I trust this answers your question.
>
> HTH
>
>
>
> Mich Talebzadeh,
> Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
> wrote:
>
>> Hey All,
>>
>> We are running Pyspark spark-submit from a client outside the cluster.
>> The client has network connectivity only to the Yarn Master, not the HDFS
>> Datanodes. How can we submit the jobs? The idea would be to preload all the
>> dependencies (job code, libraries, etc) to HDFS, and just submit the job
>> from the client.
>>
>> We tried something like this
>> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
>> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>>
>> The error we are getting is
>> "
>>
>> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
>> waiting for channel to be ready for connect. ch :
>> java.nio.channels.SocketChannel[connection-pending remote=/
>> 10.117.110.19:9866]
>>
>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
>> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
>> could only be written to 0 of the 1 minReplication nodes. There are 2
>> datanode(s) running and 2 node(s) are excluded in this operation.
>> "
>>
>> A few question
>> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
>> files? Why would the client send them to the cl

Re: Spark-submit without access to HDFS

2023-11-17 Thread Mich Talebzadeh
Hi,

How are you submitting your spark job from your client?

Your files can either be on HDFS or HCFS such as gs, s3 etc.

With reference to --py-files hdfs://yarn-master-url hdfs://foo.py', I
assume you want your

spark-submit --verbose \
   --deploy-mode cluster \
   --conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
   --conf "spark.yarn.appMasterEnv.PYTHONPATH=${PYTHONPATH}" \
   --conf "spark.executorEnv.PYTHONPATH=${PYTHONPATH}" \
   --py-files $CODE_DIRECTORY_CLOUD/dataproc_on_gke.zip \
   --conf "spark.driver.memory"=4G \
   --conf "spark.executor.memory"=4G \
   --conf "spark.num.executors"=4 \
   --conf "spark.executor.cores"=2 \
   $CODE_DIRECTORY_CLOUD/${APPLICATION}

in my case I define $CODE_DIRECTORY_CLOUD as below on google cloud storage

CODE_DIRECTORY="/home/hduser/dba/bin/python/"
CODE_DIRECTORY_CLOUD="gs://,${PROJECT}-spark-on-k8s/codes"
cd $CODE_DIRECTORY
[ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
echo `date` ", ===> creating source zip directory from  ${source_code}"
# zip needs to be done at root directory of code
zip -rq ${source_code}.zip ${source_code}
gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION}
$CODE_DIRECTORY_CLOUD

So in summary I create a zip  file of my project and copy it across to the
cloud storage and then put the application (py file) there as well and use
them in spark-submit

I trust this answers your question.

HTH



Mich Talebzadeh,
Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 15 Nov 2023 at 21:33, Eugene Miretsky 
wrote:

> Hey All,
>
> We are running Pyspark spark-submit from a client outside the cluster. The
> client has network connectivity only to the Yarn Master, not the HDFS
> Datanodes. How can we submit the jobs? The idea would be to preload all the
> dependencies (job code, libraries, etc) to HDFS, and just submit the job
> from the client.
>
> We tried something like this
> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>
> The error we are getting is
> "
>
> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/
> 10.117.110.19:9866]
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
> could only be written to 0 of the 1 minReplication nodes. There are 2
> datanode(s) running and 2 node(s) are excluded in this operation.
> "
>
> A few question
> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
> files? Why would the client send them to the cluster? (the cluster already
> has all that info - this would make sense in client mode, but not cluster
> mode )
> 2) Is it possible to use spark-submit without HDFS access?
> 3) How would we fix this?
>
> Cheers,
> Eugene
>
> --
>
> *Eugene Miretsky*
> Managing Partner |  Badal.io | Book a meeting /w me!
> <http://calendly.com/eugene-badal>
> mobile:  416-568-9245
> email: eug...@badal.io 
>


Re: Spark-submit without access to HDFS

2023-11-16 Thread Jörn Franke
I am not 100% sure but I do not think this works - the driver would need access to HDFS.What you could try (have not tested it though in your scenario):- use SparkConnect: https://spark.apache.org/docs/latest/spark-connect-overview.html- host the zip file on a https server and use that url (I would recommend against it though for various reasons, such as reliability)Am 15.11.2023 um 22:33 schrieb Eugene Miretsky :Hey All, We are running Pyspark spark-submit from a client outside the cluster. The client has network connectivity only to the Yarn Master, not the HDFS Datanodes. How can we submit the jobs? The idea would be to preload all the dependencies (job code, libraries, etc) to HDFS, and just submit the job from the client. We tried something like this'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'The error we are getting is "org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation." A few question 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? Why would the client send them to the cluster? (the cluster already has all that info - this would make sense in client mode, but not cluster mode )2) Is it possible to use spark-submit without HDFS access? 3) How would we fix this?  Cheers,Eugene-- Eugene MiretskyManaging Partner |  Badal.io | Book a meeting /w me! mobile:  416-568-9245email:     eug...@badal.io


Re: Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-11-15 Thread eab...@163.com
Hi Eugene,
  As the logs indicate, when executing spark-submit, Spark will package and 
upload spark/conf to HDFS, along with uploading spark/jars. These files are 
uploaded to HDFS unless you specify uploading them to another OSS. To do so, 
you'll need to modify the configuration in hdfs-site.xml, for instance, 
fs.oss.impl, etc.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 09:58
To: eab...@163.com
CC: Eugene Miretsky; user @spark
Subject: Re: [EXTERNAL] Re: Spark-submit without access to HDFS
Hey! 

Thanks for the response. 

We are getting the error because there is no network connectivity to the data 
nodes - that's expected. 

What I am trying to find out is WHY we need access to the data nodes, and if 
there is a way to submit a job without it. 

Cheers,
Eugene

On Wed, Nov 15, 2023 at 7:32 PM eab...@163.com  wrote:
Hi Eugene,
I think you should Check if the HDFS service is running properly.  From the 
logs, it appears that there are two datanodes in HDFS,  but none of them are 
healthy.  Please investigate the reasons why the datanodes are not functioning 
properly.  It seems that the issue might be due to insufficient disk space.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 05:31
To: user
Subject: Spark-submit without access to HDFS
Hey All, 

We are running Pyspark spark-submit from a client outside the cluster. The 
client has network connectivity only to the Yarn Master, not the HDFS 
Datanodes. How can we submit the jobs? The idea would be to preload all the 
dependencies (job code, libraries, etc) to HDFS, and just submit the job from 
the client. 

We tried something like this
'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn 
--deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'

The error we are getting is 
"
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could 
only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
running and 2 node(s) are excluded in this operation.
" 

A few question 
1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? 
Why would the client send them to the cluster? (the cluster already has all 
that info - this would make sense in client mode, but not cluster mode )
2) Is it possible to use spark-submit without HDFS access? 
3) How would we fix this?  

Cheers,
Eugene

-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


Re: [EXTERNAL] Re: Spark-submit without access to HDFS

2023-11-15 Thread Eugene Miretsky
Hey!

Thanks for the response.

We are getting the error because there is no network connectivity to the
data nodes - that's expected.

What I am trying to find out is WHY we need access to the data nodes, and
if there is a way to submit a job without it.

Cheers,
Eugene

On Wed, Nov 15, 2023 at 7:32 PM eab...@163.com  wrote:

> Hi Eugene,
> I think you should Check if the HDFS service is running properly.  From 
> the logs, it appears that there are two datanodes in HDFS,
>  but none of them are healthy.
> Please investigate the reasons why the datanodes are not functioning properly.
> It seems that the issue might be due to insufficient disk space.
>
> --
> eabour
>
>
> *From:* Eugene Miretsky 
> *Date:* 2023-11-16 05:31
> *To:* user 
> *Subject:* Spark-submit without access to HDFS
> Hey All,
>
> We are running Pyspark spark-submit from a client outside the cluster. The
> client has network connectivity only to the Yarn Master, not the HDFS
> Datanodes. How can we submit the jobs? The idea would be to preload all the
> dependencies (job code, libraries, etc) to HDFS, and just submit the job
> from the client.
>
> We tried something like this
> 'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
> yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'
>
> The error we are getting is
> "
>
> org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/
> 10.117.110.19:9866]
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
> could only be written to 0 of the 1 minReplication nodes. There are 2
> datanode(s) running and 2 node(s) are excluded in this operation.
> "
>
> A few question
> 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
> files? Why would the client send them to the cluster? (the cluster already
> has all that info - this would make sense in client mode, but not cluster
> mode )
> 2) Is it possible to use spark-submit without HDFS access?
> 3) How would we fix this?
>
> Cheers,
> Eugene
>
> --
>
> *Eugene Miretsky*
> Managing Partner |  Badal.io | Book a meeting /w me!
> <http://calendly.com/eugene-badal>
> mobile:  416-568-9245
> email: eug...@badal.io 
>
>

-- 

*Eugene Miretsky*
Managing Partner |  Badal.io | Book a meeting /w me!
<http://calendly.com/eugene-badal>
mobile:  416-568-9245
email: eug...@badal.io 


Re: Spark-submit without access to HDFS

2023-11-15 Thread eab...@163.com
Hi Eugene,
I think you should Check if the HDFS service is running properly.  From the 
logs, it appears that there are two datanodes in HDFS,  but none of them are 
healthy.  Please investigate the reasons why the datanodes are not functioning 
properly.  It seems that the issue might be due to insufficient disk space.



eabour
 
From: Eugene Miretsky
Date: 2023-11-16 05:31
To: user
Subject: Spark-submit without access to HDFS
Hey All, 

We are running Pyspark spark-submit from a client outside the cluster. The 
client has network connectivity only to the Yarn Master, not the HDFS 
Datanodes. How can we submit the jobs? The idea would be to preload all the 
dependencies (job code, libraries, etc) to HDFS, and just submit the job from 
the client. 

We tried something like this
'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn 
--deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'

The error we are getting is 
"
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could 
only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
running and 2 node(s) are excluded in this operation.
" 

A few question 
1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? 
Why would the client send them to the cluster? (the cluster already has all 
that info - this would make sense in client mode, but not cluster mode )
2) Is it possible to use spark-submit without HDFS access? 
3) How would we fix this?  

Cheers,
Eugene

-- 

Eugene Miretsky
Managing Partner |  Badal.io | Book a meeting /w me! 
mobile:  416-568-9245
email: eug...@badal.io


Spark-submit without access to HDFS

2023-11-15 Thread Eugene Miretsky
Hey All,

We are running Pyspark spark-submit from a client outside the cluster. The
client has network connectivity only to the Yarn Master, not the HDFS
Datanodes. How can we submit the jobs? The idea would be to preload all the
dependencies (job code, libraries, etc) to HDFS, and just submit the job
from the client.

We tried something like this
'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master
yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'

The error we are getting is
"

org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
waiting for channel to be ready for connect. ch :
java.nio.channels.SocketChannel[connection-pending remote=/
10.117.110.19:9866]

org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/users/.sparkStaging/application_1698216436656_0104/*spark_conf.zip*
could only be written to 0 of the 1 minReplication nodes. There are 2
datanode(s) running and 2 node(s) are excluded in this operation.
"

A few question
1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf
files? Why would the client send them to the cluster? (the cluster already
has all that info - this would make sense in client mode, but not cluster
mode )
2) Is it possible to use spark-submit without HDFS access?
3) How would we fix this?

Cheers,
Eugene

-- 

*Eugene Miretsky*
Managing Partner |  Badal.io | Book a meeting /w me!
<http://calendly.com/eugene-badal>
mobile:  416-568-9245
email: eug...@badal.io 


Re: [External Email] Re: About /mnt/hdfs/current/BP directories

2023-09-08 Thread Nebi Aydin
Usually job never reaches that point fails during shuffle. And storage
memory and executor memory when it failed is usually low
On Fri, Sep 8, 2023 at 16:49 Jack Wells  wrote:

> Assuming you’re not writing to HDFS in your code, Spark can spill to HDFS
> if it runs out of memory on a per-executor basis. This could happen when
> evaluating a cache operation like you have below or during shuffle
> operations in joins, etc. You might try to increase executor memory, tune
> shuffle operations, avoid caching, or reduce the size of your dataframe(s).
>
> Jack
>
> On Sep 8, 2023 at 12:43:07, Nebi Aydin 
> wrote:
>
>>
>> Sure
>> df = spark.read.option("basePath",
>> some_path).parquet(*list_of_s3_file_paths())
>> (
>> df
>> .where(SOME FILTER)
>> .repartition(6)
>> .cache()
>> )
>>
>> On Fri, Sep 8, 2023 at 14:56 Jack Wells  wrote:
>>
>>> Hi Nebi, can you share the code you’re using to read and write from S3?
>>>
>>> On Sep 8, 2023 at 10:59:59, Nebi Aydin 
>>> wrote:
>>>
>>>> Hi all,
>>>> I am using spark on EMR to process data. Basically i read data from AWS
>>>> S3 and do the transformation and post transformation i am loading/writing
>>>> data to s3.
>>>>
>>>> Recently we have found that hdfs(/mnt/hdfs) utilization is going too
>>>> high.
>>>>
>>>> I disabled `yarn.log-aggregation-enable` by setting it to False.
>>>>
>>>> I am not writing any data to hdfs(/mnt/hdfs) however is that spark is
>>>> creating blocks and writing data into it. We are going all the operations
>>>> in memory.
>>>>
>>>> Any specific operation writing data to datanode(HDFS)?
>>>>
>>>> Here is the hdfs dirs created.
>>>>
>>>> ```
>>>>
>>>> *15.4G
>>>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized/subdir1
>>>>
>>>> 129G
>>>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized
>>>>
>>>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current
>>>>
>>>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812
>>>>
>>>> 129G /mnt/hdfs/current 129G /mnt/hdfs*
>>>>
>>>> ```
>>>>
>>>>
>>>> <https://stackoverflow.com/collectives/aws>
>>>>
>>>


Re: [External Email] Re: About /mnt/hdfs/current/BP directories

2023-09-08 Thread Jack Wells
 Assuming you’re not writing to HDFS in your code, Spark can spill to HDFS
if it runs out of memory on a per-executor basis. This could happen when
evaluating a cache operation like you have below or during shuffle
operations in joins, etc. You might try to increase executor memory, tune
shuffle operations, avoid caching, or reduce the size of your dataframe(s).

Jack

On Sep 8, 2023 at 12:43:07, Nebi Aydin 
wrote:

>
> Sure
> df = spark.read.option("basePath",
> some_path).parquet(*list_of_s3_file_paths())
> (
> df
> .where(SOME FILTER)
> .repartition(6)
> .cache()
> )
>
> On Fri, Sep 8, 2023 at 14:56 Jack Wells  wrote:
>
>> Hi Nebi, can you share the code you’re using to read and write from S3?
>>
>> On Sep 8, 2023 at 10:59:59, Nebi Aydin 
>> wrote:
>>
>>> Hi all,
>>> I am using spark on EMR to process data. Basically i read data from AWS
>>> S3 and do the transformation and post transformation i am loading/writing
>>> data to s3.
>>>
>>> Recently we have found that hdfs(/mnt/hdfs) utilization is going too
>>> high.
>>>
>>> I disabled `yarn.log-aggregation-enable` by setting it to False.
>>>
>>> I am not writing any data to hdfs(/mnt/hdfs) however is that spark is
>>> creating blocks and writing data into it. We are going all the operations
>>> in memory.
>>>
>>> Any specific operation writing data to datanode(HDFS)?
>>>
>>> Here is the hdfs dirs created.
>>>
>>> ```
>>>
>>> *15.4G
>>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized/subdir1
>>>
>>> 129G
>>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized
>>>
>>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current
>>>
>>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812
>>>
>>> 129G /mnt/hdfs/current 129G /mnt/hdfs*
>>>
>>> ```
>>>
>>>
>>> <https://stackoverflow.com/collectives/aws>
>>>
>>


Re: [External Email] Re: About /mnt/hdfs/current/BP directories

2023-09-08 Thread Nebi Aydin
Sure
df = spark.read.option("basePath",
some_path).parquet(*list_of_s3_file_paths())
(
df
.where(SOME FILTER)
.repartition(6)
.cache()
)

On Fri, Sep 8, 2023 at 14:56 Jack Wells  wrote:

> Hi Nebi, can you share the code you’re using to read and write from S3?
>
> On Sep 8, 2023 at 10:59:59, Nebi Aydin 
> wrote:
>
>> Hi all,
>> I am using spark on EMR to process data. Basically i read data from AWS
>> S3 and do the transformation and post transformation i am loading/writing
>> data to s3.
>>
>> Recently we have found that hdfs(/mnt/hdfs) utilization is going too high.
>>
>> I disabled `yarn.log-aggregation-enable` by setting it to False.
>>
>> I am not writing any data to hdfs(/mnt/hdfs) however is that spark is
>> creating blocks and writing data into it. We are going all the operations
>> in memory.
>>
>> Any specific operation writing data to datanode(HDFS)?
>>
>> Here is the hdfs dirs created.
>>
>> ```
>>
>> *15.4G
>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized/subdir1
>>
>> 129G
>> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized
>>
>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current
>>
>> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812
>>
>> 129G /mnt/hdfs/current 129G /mnt/hdfs*
>>
>> ```
>>
>>
>> <https://stackoverflow.com/collectives/aws>
>>
>


Re: About /mnt/hdfs/current/BP directories

2023-09-08 Thread Jack Wells
 Hi Nebi, can you share the code you’re using to read and write from S3?

On Sep 8, 2023 at 10:59:59, Nebi Aydin 
wrote:

> Hi all,
> I am using spark on EMR to process data. Basically i read data from AWS S3
> and do the transformation and post transformation i am loading/writing data
> to s3.
>
> Recently we have found that hdfs(/mnt/hdfs) utilization is going too high.
>
> I disabled `yarn.log-aggregation-enable` by setting it to False.
>
> I am not writing any data to hdfs(/mnt/hdfs) however is that spark is
> creating blocks and writing data into it. We are going all the operations
> in memory.
>
> Any specific operation writing data to datanode(HDFS)?
>
> Here is the hdfs dirs created.
>
> ```
>
> *15.4G
> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized/subdir1
>
> 129G
> /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized
>
> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current
>
> 129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812
>
> 129G /mnt/hdfs/current 129G /mnt/hdfs*
>
> ```
>
>
> <https://stackoverflow.com/collectives/aws>
>


About /mnt/hdfs/current/BP directories

2023-09-08 Thread Nebi Aydin
Hi all,
I am using spark on EMR to process data. Basically i read data from AWS S3
and do the transformation and post transformation i am loading/writing data
to s3.

Recently we have found that hdfs(/mnt/hdfs) utilization is going too high.

I disabled `yarn.log-aggregation-enable` by setting it to False.

I am not writing any data to hdfs(/mnt/hdfs) however is that spark is
creating blocks and writing data into it. We are going all the operations
in memory.

Any specific operation writing data to datanode(HDFS)?

Here is the hdfs dirs created.

```

*15.4G
/mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized/subdir1

129G
/mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current/finalized

129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812/current

129G /mnt/hdfs/current/BP-6706123673-10.xx.xx.xxx-1588026945812

129G /mnt/hdfs/current 129G /mnt/hdfs*

```


<https://stackoverflow.com/collectives/aws>


Spark Thrift Server issue with external HDFS table

2023-02-01 Thread Kalhara Gurugamage
Hello Team,


We are using the spark 3.3.0 version.

We’ve created external HDFS tables using beeline spark with thrift server

Here we have multiple parquet files in one partition need to be attached to
the external HDFS table. Note that HDFS data is stored as distributed setup.



   - There is a scenario when there are multiple parquet files at the same
   HDFS partition. Once that particular partition is added to the external
   table which is created using beeline and run a query to external HDFS table
   the query seems to be halted.

Then we are opening another beeline session and run another query (no need
to be the same table). The previous halted query which dispatched for
external HDFS table will be giving results.


   - In the other scenario adding those parquet files one by one and
   refreshing the table to that same partition. No issue will occur when
   querying in the external hdfs table.
   - And also if there is only one parquet file exists on that hdfs
   partition, queries to external hdfs table are returning without above
   mentioned behavior.

Is there any  solution to avoid above abnormal behavior


Thankyou and regards


Kalhara Gurugamage


Fwd: [Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode

2023-01-31 Thread Wei Yan
Glad to hear that!
And hope it can help any other guys facing the same problem.

-- Forwarded message -
发件人: Bansal, Jaimita 
Date: 2023年2月1日周三 03:15
Subject: RE: [Spark Standalone Mode] How to read from kerberised HDFS in
spark standalone mode
To: Wei Yan 
Cc: Chittajallu, Rajiv ,
abner.espin...@ny.email.gs.com 


Hey Wei,



This worked!  Thank you so much.



Thanks,

Jaimita



*From:* Wei Yan 
*Sent:* Thursday, January 19, 2023 7:08 PM
*To:* Bansal, Jaimita [Engineering] 
*Subject:* Re: [Spark Standalone Mode] How to read from kerberised HDFS in
spark standalone mode



Hi!

You can use the  Delegation Token.

In the spark standalone mode ,the simple way to use the Delegation Token is
set an environment variable in every node include master nodes and work
nodes, and the content of this environment variable is the path of the
Delegation Token file.

You should renew this file at a fixed time interval.

hdfs fetchdt -renewer hive /opt/spark/conf/delegation.token



Bansal, Jaimita  于2023年1月20日周五 07:46写道:

Hi Spark Team,



We are facing an issue when trying to read from HDFS via spark running in
standalone cluster.  The issue comes from the executor node not able to
authenticate. It is using auth:SIMPLE when actually we have setup auth as
Kerberos.  Could you please help in resolving this?



Caused by: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]

at
org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:778)
~[hadoop-common-3.1.1.7.1.7.1000-141.jar:na]





18:57:44.726 [main] DEBUG o.a.spark.deploy.SparkHadoopUtil - creating UGI
for user: 

18:57:45.045 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login

18:57:45.046 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop
login commit

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - using
kerberos user: @GS.COM

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - Using user:
"@GS.COM" with name @GS.COM

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - User entry:
" @GS.COM"

18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - UGI
loginUser:@GS.COM (auth:KERBEROS)

18:57:45.056 [main] DEBUG o.a.h.security.UserGroupInformation -
PrivilegedAction as: (auth:SIMPLE)
from:org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)

18:57:45.078 [TGT Renewer for @GS.COM] DEBUG
o.a.h.security.UserGroupInformation - Current time is 1674068265078

18:57:45.079 [TGT Renewer for @GS.COM] DEBUG
o.a.h.security.UserGroupInformation - Next refresh is 1674136785000

18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing view
acls to: root,

18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing
modify acls to: root,

18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing view
acls groups to:

18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing
modify acls groups to:



Thanks,

Jaimita



*Vice President, Data Lake Engineering*

*Goldman Sachs*




--


Your Personal Data: We may collect and process information about you that
may be subject to data protection laws. For more information about how we
use and disclose your personal data, how we protect your information, our
legal basis to use your information, your rights and who you can contact,
please refer to: www.gs.com/privacy-notices


--

Your Personal Data: We may collect and process information about you that
may be subject to data protection laws. For more information about how we
use and disclose your personal data, how we protect your information, our
legal basis to use your information, your rights and who you can contact,
please refer to: www.gs.com/privacy-notices


[Spark Standalone Mode] How to read from kerberised HDFS in spark standalone mode

2023-01-19 Thread Bansal, Jaimita
Hi Spark Team,

We are facing an issue when trying to read from HDFS via spark running in 
standalone cluster.  The issue comes from the executor node not able to 
authenticate. It is using auth:SIMPLE when actually we have setup auth as 
Kerberos.  Could you please help in resolving this?

Caused by: java.io.IOException: 
org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
via:[TOKEN, KERBEROS]
at 
org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:778) 
~[hadoop-common-3.1.1.7.1.7.1000-141.jar:na]


18:57:44.726 [main] DEBUG o.a.spark.deploy.SparkHadoopUtil - creating UGI for 
user: 
18:57:45.045 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login
18:57:45.046 [main] DEBUG o.a.h.security.UserGroupInformation - hadoop login 
commit
18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - using kerberos 
user: @GS.COM
18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - Using user: 
"@GS.COM" with name @GS.COM
18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - User entry: 
" @GS.COM"
18:57:45.047 [main] DEBUG o.a.h.security.UserGroupInformation - UGI 
loginUser:@GS.COM (auth:KERBEROS)
18:57:45.056 [main] DEBUG o.a.h.security.UserGroupInformation - 
PrivilegedAction as: (auth:SIMPLE) 
from:org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
18:57:45.078 [TGT Renewer for @GS.COM] DEBUG 
o.a.h.security.UserGroupInformation - Current time is 1674068265078
18:57:45.079 [TGT Renewer for @GS.COM] DEBUG 
o.a.h.security.UserGroupInformation - Next refresh is 1674136785000
18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing view acls 
to: root,
18:57:45.092 [main] INFO  org.apache.spark.SecurityManager - Changing modify 
acls to: root,
18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing view acls 
groups to:
18:57:45.093 [main] INFO  org.apache.spark.SecurityManager - Changing modify 
acls groups to:

Thanks,
Jaimita

Vice President, Data Lake Engineering
Goldman Sachs




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


Re: Spark equivalent to hdfs groups

2022-09-07 Thread phiroc
Many thanks, Sean.

- Mail original -
De: "Sean Owen" 
À: phi...@free.fr
Cc: "User" 
Envoyé: Mercredi 7 Septembre 2022 17:05:55
Objet: Re: Spark equivalent to hdfs groups


No, because this is a storage concept, and Spark is not a storage system. You 
would appeal to tools and interfaces that the storage system provides, like 
hdfs. Where or how the hdfs binary is available depends on how you deploy Spark 
where; it would be available on a Hadoop cluster. It's just not a Spark 
question. 


On Wed, Sep 7, 2022 at 9:51 AM < phi...@free.fr > wrote: 


Hi Sean, 
I'm talking about HDFS Groups. 
On Linux, you can type "hdfs groups " to get the list of the groups 
user1 belongs to. 
In Zeppelin/Spark, the hdfs executable is not accessible. 
As a result, I wondered if there was a class in Spark (eg. Security or ACL) 
which would let you access a particular user's groups. 



- Mail original - 
De: "Sean Owen" < sro...@gmail.com > 
À: phi...@free.fr 
Cc: "User" < user@spark.apache.org > 
Envoyé: Mercredi 7 Septembre 2022 16:41:01 
Objet: Re: Spark equivalent to hdfs groups 


Spark isn't a storage system or user management system; no there is no notion 
of groups (groups for what?) 


On Wed, Sep 7, 2022 at 8:36 AM < phi...@free.fr > wrote: 


Hello, 
is there a Spark equivalent to "hdfs groups "? 
Many thanks. 
Philippe 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark equivalent to hdfs groups

2022-09-07 Thread Sean Owen
No, because this is a storage concept, and Spark is not a storage system.
You would appeal to tools and interfaces that the storage system provides,
like hdfs. Where or how the hdfs binary is available depends on how you
deploy Spark where; it would be available on a Hadoop cluster. It's just
not a Spark question.

On Wed, Sep 7, 2022 at 9:51 AM  wrote:

> Hi Sean,
> I'm talking about HDFS Groups.
> On Linux, you can type "hdfs groups " to get the list of the groups
> user1 belongs to.
> In Zeppelin/Spark, the hdfs executable is not accessible.
> As a result, I wondered if there was a class in Spark (eg. Security or
> ACL) which would let you access a particular user's groups.
>
>
>
> - Mail original -
> De: "Sean Owen" 
> À: phi...@free.fr
> Cc: "User" 
> Envoyé: Mercredi 7 Septembre 2022 16:41:01
> Objet: Re: Spark equivalent to hdfs groups
>
>
> Spark isn't a storage system or user management system; no there is no
> notion of groups (groups for what?)
>
>
> On Wed, Sep 7, 2022 at 8:36 AM < phi...@free.fr > wrote:
>
>
> Hello,
> is there a Spark equivalent to "hdfs groups "?
> Many thanks.
> Philippe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark equivalent to hdfs groups

2022-09-07 Thread phiroc
Hi Sean,
I'm talking about HDFS Groups.
On Linux, you can type "hdfs groups " to get the list of the groups 
user1 belongs to.
In Zeppelin/Spark, the hdfs executable is not accessible.
As a result, I wondered if there was a class in Spark (eg. Security or ACL) 
which would let you access a particular user's groups.



- Mail original -
De: "Sean Owen" 
À: phi...@free.fr
Cc: "User" 
Envoyé: Mercredi 7 Septembre 2022 16:41:01
Objet: Re: Spark equivalent to hdfs groups


Spark isn't a storage system or user management system; no there is no notion 
of groups (groups for what?) 


On Wed, Sep 7, 2022 at 8:36 AM < phi...@free.fr > wrote: 


Hello, 
is there a Spark equivalent to "hdfs groups "? 
Many thanks. 
Philippe 

- 
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark equivalent to hdfs groups

2022-09-07 Thread Sean Owen
Spark isn't a storage system or user management system; no there is no
notion of groups (groups for what?)

On Wed, Sep 7, 2022 at 8:36 AM  wrote:

> Hello,
> is there a Spark equivalent to "hdfs groups "?
> Many thanks.
> Philippe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark equivalent to hdfs groups

2022-09-07 Thread phiroc
Hello,
is there a Spark equivalent to "hdfs groups "?
Many thanks.
Philippe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Choosing architecture for on-premise Spark & HDFS on Kubernetes cluster

2021-11-25 Thread JHI Star
Thanks, I'll have a closer look at GKE and compare it with what some other
sites running similar to use have used (Openstack).

Well, no, I don't envisage any public cloud integration. There is no plan
to use Hive just PySpark using HDFS !

On Wed, Nov 24, 2021 at 10:31 AM Mich Talebzadeh 
wrote:

> Just to clarify it should say  The current Spark Kubernetes model ...
>
>
> You will also need to build or get the Spark docker image that you are
> going to use in k8s clusters based on spark version, java version, scala
> version, OS and so forth. Are you going to use Hive as your main storage?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 23 Nov 2021 at 19:39, Mich Talebzadeh 
> wrote:
>
>> OK  to your point below
>>
>> "... We are going to deploy 20 physical Linux servers for use as an
>> on-premise Spark & HDFS on Kubernetes cluster..
>>
>>  Kubernetes is really a cloud-native technology. However, the
>> cloud-native concept does not exclude the use of on-premises infrastructure
>> in cases where it makes sense. So the question is are you going to use a
>> mesh structure to integrate these microservices together, including
>> on-premise and in cloud?
>> Now you have 20 tin boxes on-prem that you want to deploy for
>> building your Spark & HDFS stack on top of them. You will gain benefit from
>> Kubernetes and your microservices by simplifying the deployment by
>> decoupling the dependencies and abstracting your infra-structure away with
>> the ability to port these infrastructures. As you have your hardware
>> (your Linux servers),running k8s on bare metal will give you native
>> hardware performance. However, with 20 linux servers, you may limit your
>> scalability (your number of k8s nodes). If you go this way, you will need
>> to invest in a bare metal automation platform such as platform9
>> <https://platform9.com/bare-metal/> . The likelihood is that  you may
>> decide to move to the public cloud at some point or integrate with the
>> public cloud. My advice would be to look at something like GKE on-prem
>> <https://cloud.google.com/anthos/clusters/docs/on-prem/1.3/overview>
>>
>>
>> Back to Spark, The current Kubernetes model works on the basis of the 
>> "one-container-per-Pod"
>> model  <https://kubernetes.io/docs/concepts/workloads/pods/> meaning
>> that for each node of the cluster you will have one node running the driver
>> and each remaining node running one executor each. My question would be
>> will you be integrating with public cloud (AWS, GCP etc) at some point? In
>> that case you should look at mesh technologies like Istio
>> <https://cloud.google.com/learn/what-is-istio>
>>
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 23 Nov 2021 at 14:09, JHI Star  wrote:
>>
>>> We are going to deploy 20 physical Linux servers for use as an
>>> on-premise Spark & HDFS on Kubernetes cluster. My question is: within this
>>> architecture, is it best to have the pods run directly on bare metal or
>>> under VMs or system containers like LXC and/or under an on-premise instance
>>> of something like OpenStack - or something else altogether ?
>>>
>>> I am looking to garner any experience around this question relating
>>> directly to the specific use case of Spark & HDFS on Kuberenetes - I know
>>> there are also general points to consider regardless of the use case.
>>>
>>


Re: Choosing architecture for on-premise Spark & HDFS on Kubernetes cluster

2021-11-24 Thread Mich Talebzadeh
Just to clarify it should say  The current Spark Kubernetes model ...


You will also need to build or get the Spark docker image that you are
going to use in k8s clusters based on spark version, java version, scala
version, OS and so forth. Are you going to use Hive as your main storage?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 23 Nov 2021 at 19:39, Mich Talebzadeh 
wrote:

> OK  to your point below
>
> "... We are going to deploy 20 physical Linux servers for use as an
> on-premise Spark & HDFS on Kubernetes cluster..
>
>  Kubernetes is really a cloud-native technology. However, the
> cloud-native concept does not exclude the use of on-premises infrastructure
> in cases where it makes sense. So the question is are you going to use a
> mesh structure to integrate these microservices together, including
> on-premise and in cloud?
> Now you have 20 tin boxes on-prem that you want to deploy for
> building your Spark & HDFS stack on top of them. You will gain benefit from
> Kubernetes and your microservices by simplifying the deployment by
> decoupling the dependencies and abstracting your infra-structure away with
> the ability to port these infrastructures. As you have your hardware
> (your Linux servers),running k8s on bare metal will give you native
> hardware performance. However, with 20 linux servers, you may limit your
> scalability (your number of k8s nodes). If you go this way, you will need
> to invest in a bare metal automation platform such as platform9
> <https://platform9.com/bare-metal/> . The likelihood is that  you may
> decide to move to the public cloud at some point or integrate with the
> public cloud. My advice would be to look at something like GKE on-prem
> <https://cloud.google.com/anthos/clusters/docs/on-prem/1.3/overview>
>
>
> Back to Spark, The current Kubernetes model works on the basis of the 
> "one-container-per-Pod"
> model  <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
> for each node of the cluster you will have one node running the driver and
> each remaining node running one executor each. My question would be will
> you be integrating with public cloud (AWS, GCP etc) at some point? In that
> case you should look at mesh technologies like Istio
> <https://cloud.google.com/learn/what-is-istio>
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 23 Nov 2021 at 14:09, JHI Star  wrote:
>
>> We are going to deploy 20 physical Linux servers for use as an on-premise
>> Spark & HDFS on Kubernetes cluster. My question is: within this
>> architecture, is it best to have the pods run directly on bare metal or
>> under VMs or system containers like LXC and/or under an on-premise instance
>> of something like OpenStack - or something else altogether ?
>>
>> I am looking to garner any experience around this question relating
>> directly to the specific use case of Spark & HDFS on Kuberenetes - I know
>> there are also general points to consider regardless of the use case.
>>
>


Re: Choosing architecture for on-premise Spark & HDFS on Kubernetes cluster

2021-11-23 Thread Mich Talebzadeh
OK  to your point below

"... We are going to deploy 20 physical Linux servers for use as an
on-premise Spark & HDFS on Kubernetes cluster..

 Kubernetes is really a cloud-native technology. However, the cloud-native
concept does not exclude the use of on-premises infrastructure in cases
where it makes sense. So the question is are you going to use a mesh
structure to integrate these microservices together, including on-premise
and in cloud?
Now you have 20 tin boxes on-prem that you want to deploy for building your
Spark & HDFS stack on top of them. You will gain benefit from Kubernetes
and your microservices by simplifying the deployment by decoupling the
dependencies and abstracting your infra-structure away with the ability to
port these infrastructures. As you have your hardware (your Linux
servers),running k8s on bare metal will give you native hardware
performance. However, with 20 linux servers, you may limit your scalability
(your number of k8s nodes). If you go this way, you will need to invest in
a bare metal automation platform such as platform9
<https://platform9.com/bare-metal/> . The likelihood is that  you may
decide to move to the public cloud at some point or integrate with the
public cloud. My advice would be to look at something like GKE on-prem
<https://cloud.google.com/anthos/clusters/docs/on-prem/1.3/overview>


Back to Spark, The current Kubernetes model works on the basis of the
"one-container-per-Pod"
model  <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
for each node of the cluster you will have one node running the driver and
each remaining node running one executor each. My question would be will
you be integrating with public cloud (AWS, GCP etc) at some point? In that
case you should look at mesh technologies like Istio
<https://cloud.google.com/learn/what-is-istio>


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 23 Nov 2021 at 14:09, JHI Star  wrote:

> We are going to deploy 20 physical Linux servers for use as an on-premise
> Spark & HDFS on Kubernetes cluster. My question is: within this
> architecture, is it best to have the pods run directly on bare metal or
> under VMs or system containers like LXC and/or under an on-premise instance
> of something like OpenStack - or something else altogether ?
>
> I am looking to garner any experience around this question relating
> directly to the specific use case of Spark & HDFS on Kuberenetes - I know
> there are also general points to consider regardless of the use case.
>


Choosing architecture for on-premise Spark & HDFS on Kubernetes cluster

2021-11-23 Thread JHI Star
We are going to deploy 20 physical Linux servers for use as an on-premise
Spark & HDFS on Kubernetes cluster. My question is: within this
architecture, is it best to have the pods run directly on bare metal or
under VMs or system containers like LXC and/or under an on-premise instance
of something like OpenStack - or something else altogether ?

I am looking to garner any experience around this question relating
directly to the specific use case of Spark & HDFS on Kuberenetes - I know
there are also general points to consider regardless of the use case.


Accessing a kerberized HDFS using Spark on Openshift

2021-10-13 Thread Gal Shinder
Hi,

I have a pod on openshift 4.6 running a jupyter notebook with spark 3.1.1 and 
python 3.7 (based on open data hub, tweaked the dockerfile because I wanted 
this specific python version).

I'm trying to run spark in client mode using the image of google's spark 
operator (gcr.io/spark-operator/spark-py:v3.1.1), spark runs fine but I'm 
unable to connect to a kerberized cloudera hdfs, I've tried the examples 
outlined in the security documentation 
(https://github.com/apache/spark/blob/master/docs/security.md#secure-interaction-with-kubernetes)
 and numerous other combinations but nothing seems to work.

I managed to authenticate with kerberos by passing additional java parameters 
to the driver and executors (-Djava.security.krb5.conf), and passing the 
kerberos config to the executors using the configmap auto generated from the 
folder which SPARK_CONF points to on the driver, I'll try to pass the hadoop 
configuration files like that as well and set the hadoop home just to test the 
connection.
 
I don't want to use that solution in prod, 
`spark.kubernetes.kerberos.krb5.configMapName` and 
`spark.kubernetes.hadoop.configMapName` don't seem to do anything, the pod spec 
of the executors doesn't have those volumes, I'm using 
`spark.kubernetes.authenticate.oauthToken` to authenticate with k8s and I'm 
using a user who is a clusteradmin.

I also don't want to get a delegation token, figured I can just use the keytab 
even though the examples in the security documentation don't mention using a 
keytab with the configmaps.

The configuration I'm trying to use:
spark.kubernetes.authenticate.oauthToken with the oauth token of a cluster 
admin.

spark.kubernetes.hadoop.configMapName pointing to a configmap containing the 
core-site.xml and hdfs-site.xml I got from the cloudera manager

spark.kubernetes.kerberos.krb5.configMapName pointing to a configmap containing 
a krb5.conf

spark.kerberos.keytab 

spark.kerberos.principal


Thanks, 
Gal

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to read multiple HDFS directories

2021-05-05 Thread Kapil Garg
Hi Lalwani,
But I need to augment the directory specific data to every record of that
directory.
Once I have read the data, there is no link back to the directory in the
data which I can use to augment additional data

On Wed, May 5, 2021 at 10:41 PM Lalwani, Jayesh 
wrote:

> You don’t have to union multiple RDDs.  You can read files from multiple
> directories in a single read call. Spark will manage partitioning of the
> data across directories.
>
>
>
> *From: *Kapil Garg 
> *Date: *Wednesday, May 5, 2021 at 10:45 AM
> *To: *spark users 
> *Subject: *[EXTERNAL] How to read multiple HDFS directories
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
> I am facing issues while reading multiple HDFS directories. Please read
> the problem statement and current approach below
>
>
>
> *Problem Statement*
>
> There are N HDFS directories each having K files. We want to read data
> from all directories such that when we read data from directory D, we map
> all the data and augment it with additional information specific to that
> directory.
>
>
>
> *Current Approach*
>
> In current approach, we are iterating over the directories, reading it in
> RDD, mapping the RDD and the putting the RDD into a list.
>
> After all N directories have been read, we have a list of N RDDs
>
> We call spark Union on the list to merge them together.
>
>
>
> This approach is causing data skewness because there is 1 directory of
> size 12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's
> turn comes, spark submits its task on available executors causing the RDD
> to present on few executors instead of spreading on all.
>
>
>
> Is there a way to avoid this data skewness ? I couldn't find any RDD API,
> spark config which could enforce the data reading tasks evenly on all
> executors.
>
>
> --
>
> Regards
> Kapil Garg
>
>
>
>
> *-*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-*
>
>

-- 
Regards
Kapil Garg

-- 


*-*

*This email and any files transmitted with it are confi

Re: How to read multiple HDFS directories

2021-05-05 Thread Lalwani, Jayesh
You don’t have to union multiple RDDs.  You can read files from multiple 
directories in a single read call. Spark will manage partitioning of the data 
across directories.

From: Kapil Garg 
Date: Wednesday, May 5, 2021 at 10:45 AM
To: spark users 
Subject: [EXTERNAL] How to read multiple HDFS directories


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,
I am facing issues while reading multiple HDFS directories. Please read the 
problem statement and current approach below

Problem Statement
There are N HDFS directories each having K files. We want to read data from all 
directories such that when we read data from directory D, we map all the data 
and augment it with additional information specific to that directory.

Current Approach
In current approach, we are iterating over the directories, reading it in RDD, 
mapping the RDD and the putting the RDD into a list.
After all N directories have been read, we have a list of N RDDs
We call spark Union on the list to merge them together.

This approach is causing data skewness because there is 1 directory of size 12 
GBs whereas other RDDs are less than 1 GB. So when the large RDD's turn comes, 
spark submits its task on available executors causing the RDD to present on few 
executors instead of spreading on all.

Is there a way to avoid this data skewness ? I couldn't find any RDD API, spark 
config which could enforce the data reading tasks evenly on all executors.

--
Regards
Kapil Garg



-

This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error, please notify the system manager. This 
message contains confidential information and is intended only for the 
individual named. If you are not the named addressee, you should not 
disseminate, distribute or copy this email. Please notify the sender 
immediately by email if you have received this email by mistake and delete this 
email from your system. If you are not the intended recipient, you are notified 
that disclosing, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited.



Any views or opinions presented in this email are solely those of the author 
and do not necessarily represent those of the organization. Any information on 
shares, debentures or similar instruments, recommended product pricing, 
valuations and the like are for information purposes only. It is not meant to 
be an instruction or recommendation, as the case may be, to buy or to sell 
securities, products, services nor an offer to buy or sell securities, products 
or services unless specifically stated to be so on behalf of the Flipkart 
group. Employees of the Flipkart group of companies are expressly required not 
to make defamatory statements and not to infringe or authorise any infringement 
of copyright or any other legal right by email communications. Any such 
communication is contrary to organizational policy and outside the scope of the 
employment of the individual concerned. The organization will not accept any 
liability in respect of such communication, and the employee responsible will 
be personally liable for any damages or other liability arising.



Our organization accepts no liability for the content of this email, or for the 
consequences of any actions taken on the basis of the information provided, 
unless that information is subsequently confirmed in writing. If you are not 
the intended recipient, you are notified that disclosing, copying, distributing 
or taking any action in reliance on the contents of this information is 
strictly prohibited.

-


Re: How to read multiple HDFS directories

2021-05-05 Thread Kapil Garg
Hi Mich,
The number of directories can be 1000+, doing 1000+ reduce by key and union
might be a costlier operation.

On Wed, May 5, 2021 at 10:22 PM Mich Talebzadeh 
wrote:

> This is my take
>
>
>1. read the current snapshot (provide empty if it doesn't exist yet)
>2. Loop over N directories
>   1. read unprocessed new data from HDFS
>   2. union them and do a `reduceByKey` operation
>   3. output a new version of the snapshot
>
> HTH
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 May 2021 at 17:03, Kapil Garg  wrote:
>
>> Sorry but I didn't get the question. It is possible that 1 record is
>> present in multiple directories. That's why we do a reduceByKey after the
>> union step.
>>
>> On Wed, May 5, 2021 at 9:20 PM Mich Talebzadeh 
>> wrote:
>>
>>> When you are doing union on these RDDs, (each RDD has one to one
>>> correspondence with an HDFS directory), do you have a common key across all?
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 May 2021 at 16:23, Kapil Garg  wrote:
>>>
>>>> Hi Mich,
>>>> I went through the thread and it doesn't relate to the problem
>>>> statement I shared above.
>>>>
>>>> In my problem statement, there is a simple ETL job which doesn't use
>>>> any external library (such as pandas)
>>>> This is the flow
>>>>
>>>> *hdfsDirs := List(); //contains N directories*
>>>>
>>>> *rddList := List();*
>>>> *for each directory in hdfsDirs:*
>>>> *rdd = spark.read(directory)*
>>>> *rdd.map() //augment the data with additional directory related
>>>> data*
>>>> *rddList.add(rdd)*
>>>>
>>>> *finalRdd = spark.union(rddList) // number of tasks = N*K // Files are
>>>> distributed unevenly on executors here*
>>>>
>>>> *finalRdd.partionBy(hashpartitioner); // here tasks take uneven time*
>>>>
>>>> Is it possible to make the union step read the directory evenly on each
>>>> executor, that way, each executor will have roughly the same amount of data
>>>>
>>>>
>>>>
>>>> On Wed, May 5, 2021 at 8:35 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Have a look at this thread called
>>>>>
>>>>> Tasks are skewed to one executor
>>>>>
>>>>> and see if it helps and we can take it from there.
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 May 2021 at 15:46, Kapil Garg 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am facing issues while reading multiple HDFS directories. Please
>>>>>> read the problem statement and current approach bel

Re: How to read multiple HDFS directories

2021-05-05 Thread Mich Talebzadeh
This is my take


   1. read the current snapshot (provide empty if it doesn't exist yet)
   2. Loop over N directories
  1. read unprocessed new data from HDFS
  2. union them and do a `reduceByKey` operation
  3. output a new version of the snapshot

HTH

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 May 2021 at 17:03, Kapil Garg  wrote:

> Sorry but I didn't get the question. It is possible that 1 record is
> present in multiple directories. That's why we do a reduceByKey after the
> union step.
>
> On Wed, May 5, 2021 at 9:20 PM Mich Talebzadeh 
> wrote:
>
>> When you are doing union on these RDDs, (each RDD has one to one
>> correspondence with an HDFS directory), do you have a common key across all?
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 May 2021 at 16:23, Kapil Garg  wrote:
>>
>>> Hi Mich,
>>> I went through the thread and it doesn't relate to the problem statement
>>> I shared above.
>>>
>>> In my problem statement, there is a simple ETL job which doesn't use any
>>> external library (such as pandas)
>>> This is the flow
>>>
>>> *hdfsDirs := List(); //contains N directories*
>>>
>>> *rddList := List();*
>>> *for each directory in hdfsDirs:*
>>> *rdd = spark.read(directory)*
>>> *rdd.map() //augment the data with additional directory related data*
>>> *rddList.add(rdd)*
>>>
>>> *finalRdd = spark.union(rddList) // number of tasks = N*K // Files are
>>> distributed unevenly on executors here*
>>>
>>> *finalRdd.partionBy(hashpartitioner); // here tasks take uneven time*
>>>
>>> Is it possible to make the union step read the directory evenly on each
>>> executor, that way, each executor will have roughly the same amount of data
>>>
>>>
>>>
>>> On Wed, May 5, 2021 at 8:35 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Have a look at this thread called
>>>>
>>>> Tasks are skewed to one executor
>>>>
>>>> and see if it helps and we can take it from there.
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 May 2021 at 15:46, Kapil Garg 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am facing issues while reading multiple HDFS directories. Please
>>>>> read the problem statement and current approach below
>>>>>
>>>>> *Problem Statement*
>>>>> There are N HDFS directories each having K files. We want to read data
>>>>> from all directories such that when we read data from directory D, we map
>>>>> all the data and augment it with additional information specific to that
>>>>> directory.
>>>>>
>>>>> *Current Approach*
>>>>> In current approach, we are iterating over the directories, reading it
>>>>> in RDD, mapping the RDD and the putting the RDD into a list.
>>>>> After all N directories have been read, 

Re: How to read multiple HDFS directories

2021-05-05 Thread Kapil Garg
Sorry but I didn't get the question. It is possible that 1 record is
present in multiple directories. That's why we do a reduceByKey after the
union step.

On Wed, May 5, 2021 at 9:20 PM Mich Talebzadeh 
wrote:

> When you are doing union on these RDDs, (each RDD has one to one
> correspondence with an HDFS directory), do you have a common key across all?
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 May 2021 at 16:23, Kapil Garg  wrote:
>
>> Hi Mich,
>> I went through the thread and it doesn't relate to the problem statement
>> I shared above.
>>
>> In my problem statement, there is a simple ETL job which doesn't use any
>> external library (such as pandas)
>> This is the flow
>>
>> *hdfsDirs := List(); //contains N directories*
>>
>> *rddList := List();*
>> *for each directory in hdfsDirs:*
>> *rdd = spark.read(directory)*
>> *rdd.map() //augment the data with additional directory related data*
>> *rddList.add(rdd)*
>>
>> *finalRdd = spark.union(rddList) // number of tasks = N*K // Files are
>> distributed unevenly on executors here*
>>
>> *finalRdd.partionBy(hashpartitioner); // here tasks take uneven time*
>>
>> Is it possible to make the union step read the directory evenly on each
>> executor, that way, each executor will have roughly the same amount of data
>>
>>
>>
>> On Wed, May 5, 2021 at 8:35 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> Have a look at this thread called
>>>
>>> Tasks are skewed to one executor
>>>
>>> and see if it helps and we can take it from there.
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 May 2021 at 15:46, Kapil Garg 
>>> wrote:
>>>
>>>> Hi,
>>>> I am facing issues while reading multiple HDFS directories. Please read
>>>> the problem statement and current approach below
>>>>
>>>> *Problem Statement*
>>>> There are N HDFS directories each having K files. We want to read data
>>>> from all directories such that when we read data from directory D, we map
>>>> all the data and augment it with additional information specific to that
>>>> directory.
>>>>
>>>> *Current Approach*
>>>> In current approach, we are iterating over the directories, reading it
>>>> in RDD, mapping the RDD and the putting the RDD into a list.
>>>> After all N directories have been read, we have a list of N RDDs
>>>> We call spark Union on the list to merge them together.
>>>>
>>>> This approach is causing data skewness because there is 1 directory of
>>>> size 12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's
>>>> turn comes, spark submits its task on available executors causing the RDD
>>>> to present on few executors instead of spreading on all.
>>>>
>>>> Is there a way to avoid this data skewness ? I couldn't find any RDD
>>>> API, spark config which could enforce the data reading tasks evenly on all
>>>> executors.
>>>>
>>>> --
>>>> Regards
>>>> Kapil Garg
>>>>
>>>>
>>>> *-*
>>>>
>>>> *This email and any files transmitted with it are confidential and
>>>> intended solely for the use of the individual or entity to whom they are
>>>> addressed. If you have received this email in error, please notify the

Re: How to read multiple HDFS directories

2021-05-05 Thread Mich Talebzadeh
When you are doing union on these RDDs, (each RDD has one to one
correspondence with an HDFS directory), do you have a common key across all?


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 May 2021 at 16:23, Kapil Garg  wrote:

> Hi Mich,
> I went through the thread and it doesn't relate to the problem statement I
> shared above.
>
> In my problem statement, there is a simple ETL job which doesn't use any
> external library (such as pandas)
> This is the flow
>
> *hdfsDirs := List(); //contains N directories*
>
> *rddList := List();*
> *for each directory in hdfsDirs:*
> *rdd = spark.read(directory)*
> *rdd.map() //augment the data with additional directory related data*
> *rddList.add(rdd)*
>
> *finalRdd = spark.union(rddList) // number of tasks = N*K // Files are
> distributed unevenly on executors here*
>
> *finalRdd.partionBy(hashpartitioner); // here tasks take uneven time*
>
> Is it possible to make the union step read the directory evenly on each
> executor, that way, each executor will have roughly the same amount of data
>
>
>
> On Wed, May 5, 2021 at 8:35 PM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> Have a look at this thread called
>>
>> Tasks are skewed to one executor
>>
>> and see if it helps and we can take it from there.
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 May 2021 at 15:46, Kapil Garg 
>> wrote:
>>
>>> Hi,
>>> I am facing issues while reading multiple HDFS directories. Please read
>>> the problem statement and current approach below
>>>
>>> *Problem Statement*
>>> There are N HDFS directories each having K files. We want to read data
>>> from all directories such that when we read data from directory D, we map
>>> all the data and augment it with additional information specific to that
>>> directory.
>>>
>>> *Current Approach*
>>> In current approach, we are iterating over the directories, reading it
>>> in RDD, mapping the RDD and the putting the RDD into a list.
>>> After all N directories have been read, we have a list of N RDDs
>>> We call spark Union on the list to merge them together.
>>>
>>> This approach is causing data skewness because there is 1 directory of
>>> size 12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's
>>> turn comes, spark submits its task on available executors causing the RDD
>>> to present on few executors instead of spreading on all.
>>>
>>> Is there a way to avoid this data skewness ? I couldn't find any RDD
>>> API, spark config which could enforce the data reading tasks evenly on all
>>> executors.
>>>
>>> --
>>> Regards
>>> Kapil Garg
>>>
>>>
>>> *-*
>>>
>>> *This email and any files transmitted with it are confidential and
>>> intended solely for the use of the individual or entity to whom they are
>>> addressed. If you have received this email in error, please notify the
>>> system manager. This message contains confidential information and is
>>> intended only for the individual named. If you are not the named addressee,
>>> you should not disseminate, distribute or copy this email. Please notify
>>> the sender immediately by email if you have received this email by mistake
>>> and delete this email from your system. If you are not the intended
>>> recipient, you are notified that disclosing, copying, distributing or
>>> taking any action in reliance on the contents of this information is
>>> strictly prohibited.*
>>>
>>>
>&g

Re: How to read multiple HDFS directories

2021-05-05 Thread Kapil Garg
Hi Mich,
I went through the thread and it doesn't relate to the problem statement I
shared above.

In my problem statement, there is a simple ETL job which doesn't use any
external library (such as pandas)
This is the flow

*hdfsDirs := List(); //contains N directories*

*rddList := List();*
*for each directory in hdfsDirs:*
*rdd = spark.read(directory)*
*rdd.map() //augment the data with additional directory related data*
*rddList.add(rdd)*

*finalRdd = spark.union(rddList) // number of tasks = N*K // Files are
distributed unevenly on executors here*

*finalRdd.partionBy(hashpartitioner); // here tasks take uneven time*

Is it possible to make the union step read the directory evenly on each
executor, that way, each executor will have roughly the same amount of data



On Wed, May 5, 2021 at 8:35 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Have a look at this thread called
>
> Tasks are skewed to one executor
>
> and see if it helps and we can take it from there.
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 May 2021 at 15:46, Kapil Garg 
> wrote:
>
>> Hi,
>> I am facing issues while reading multiple HDFS directories. Please read
>> the problem statement and current approach below
>>
>> *Problem Statement*
>> There are N HDFS directories each having K files. We want to read data
>> from all directories such that when we read data from directory D, we map
>> all the data and augment it with additional information specific to that
>> directory.
>>
>> *Current Approach*
>> In current approach, we are iterating over the directories, reading it in
>> RDD, mapping the RDD and the putting the RDD into a list.
>> After all N directories have been read, we have a list of N RDDs
>> We call spark Union on the list to merge them together.
>>
>> This approach is causing data skewness because there is 1 directory of
>> size 12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's
>> turn comes, spark submits its task on available executors causing the RDD
>> to present on few executors instead of spreading on all.
>>
>> Is there a way to avoid this data skewness ? I couldn't find any RDD API,
>> spark config which could enforce the data reading tasks evenly on all
>> executors.
>>
>> --
>> Regards
>> Kapil Garg
>>
>>
>> *-*
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error, please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee,
>> you should not disseminate, distribute or copy this email. Please notify
>> the sender immediately by email if you have received this email by mistake
>> and delete this email from your system. If you are not the intended
>> recipient, you are notified that disclosing, copying, distributing or
>> taking any action in reliance on the contents of this information is
>> strictly prohibited.*
>>
>>
>>
>> *Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of the organization. Any
>> information on shares, debentures or similar instruments, recommended
>> product pricing, valuations and the like are for information purposes only.
>> It is not meant to be an instruction or recommendation, as the case may be,
>> to buy or to sell securities, products, services nor an offer to buy or
>> sell securities, products or services unless specifically stated to be so
>> on behalf of the Flipkart group. Employees of the Flipkart group of
>> companies are expressly required not to make defamatory statements and not
>> to infringe or authorise any infringement of copyright or any other legal
>> right by email communications. Any such communication is contrary to
>> organizational policy and outside the scope of the employment of the
>> individual concerned. The organization will not accept any liability in
>

Re: How to read multiple HDFS directories

2021-05-05 Thread Mich Talebzadeh
Hi,

Have a look at this thread called

Tasks are skewed to one executor

and see if it helps and we can take it from there.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 May 2021 at 15:46, Kapil Garg 
wrote:

> Hi,
> I am facing issues while reading multiple HDFS directories. Please read
> the problem statement and current approach below
>
> *Problem Statement*
> There are N HDFS directories each having K files. We want to read data
> from all directories such that when we read data from directory D, we map
> all the data and augment it with additional information specific to that
> directory.
>
> *Current Approach*
> In current approach, we are iterating over the directories, reading it in
> RDD, mapping the RDD and the putting the RDD into a list.
> After all N directories have been read, we have a list of N RDDs
> We call spark Union on the list to merge them together.
>
> This approach is causing data skewness because there is 1 directory of
> size 12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's
> turn comes, spark submits its task on available executors causing the RDD
> to present on few executors instead of spreading on all.
>
> Is there a way to avoid this data skewness ? I couldn't find any RDD API,
> spark config which could enforce the data reading tasks evenly on all
> executors.
>
> --
> Regards
> Kapil Garg
>
>
> *-*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-*
>
>


How to read multiple HDFS directories

2021-05-05 Thread Kapil Garg
Hi,
I am facing issues while reading multiple HDFS directories. Please read the
problem statement and current approach below

*Problem Statement*
There are N HDFS directories each having K files. We want to read data from
all directories such that when we read data from directory D, we map all
the data and augment it with additional information specific to that
directory.

*Current Approach*
In current approach, we are iterating over the directories, reading it in
RDD, mapping the RDD and the putting the RDD into a list.
After all N directories have been read, we have a list of N RDDs
We call spark Union on the list to merge them together.

This approach is causing data skewness because there is 1 directory of size
12 GBs whereas other RDDs are less than 1 GB. So when the large RDD's turn
comes, spark submits its task on available executors causing the RDD to
present on few executors instead of spreading on all.

Is there a way to avoid this data skewness ? I couldn't find any RDD API,
spark config which could enforce the data reading tasks evenly on all
executors.

-- 
Regards
Kapil Garg

-- 


*-*

*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*

_-_


Re: Spark standalone - reading kerberos hdfs

2021-01-24 Thread jelmer
The only way I ever got it to work with spark standalone is via web hdfs.

See
https://issues.apache.org/jira/browse/SPARK-5158?focusedCommentId=16516856=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16516856

On Fri, 8 Jan 2021 at 18:49, Sudhir Babu Pothineni 
wrote:

> I spin up a spark standalone cluster (spark.autheticate=false), submitted
> a job which reads remote kerberized HDFS,
>
> val spark = SparkSession.builder()
>   .master("spark://spark-standalone:7077")
>   .getOrCreate()
>
> UserGroupInformation.loginUserFromKeytab(principal, keytab)
> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>
> Ran into following exception:
>
> Caused by:
> java.io.IOException: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.hadoop.security.AccessControlException:
> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
> is: "..."; destination host is: "...":10346;
>
>
> Any suggestions?
>
> Thanks
> Sudhir
>


Re: Spark standalone - reading kerberos hdfs

2021-01-23 Thread Gábor Rőczei
Hi Sudhir,

> On 21 Jan 2021, at 16:24, Sudhir Babu Pothineni  wrote:
> 
> Any other insights into this issue? I tried multiple way to supply keytab to 
> executor 
> 
> Does spark standalone doesn’t support Kerberos?

Spark standalone mode does not support Kerberos authentication. Related source 
code:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L346


// Kerberos is not supported in standalone mode, and keytab support is not 
yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
  // If client mode, make sure the keytab is just a local path.
  if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
  }

If you want to test your application with Kerberos, I recommend for you local 
mode.

https://spark.apache.org/docs/latest/submitting-applications.html#master-urls

For example:

spark-shell --master local

and if you want to access a HDFS filesystem, then you need to add the following 
parameter as well: spark.kerberos.access.hadoopFileSystems (in old Spark 
versions this is spark.yarn.access.hadoopFileSystems)

spark-shell --master local --conf 
spark.kerberos.access.hadoopFileSystems=hdfs://namenode.example.com:8020

It will create for you the necessary HDFS delegation token. Here is a very good 
documentation about the delegation token handling in Spark

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/README.md

Best regards,

  Gabor
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark standalone - reading kerberos hdfs

2021-01-21 Thread Sudhir Babu Pothineni
Any other insights into this issue? I tried multiple way to supply keytab to 
executor 

Does spark standalone doesn’t support Kerberos?

> On Jan 8, 2021, at 1:53 PM, Sudhir Babu Pothineni  
> wrote:
> 
> 
> Incase of Spark on Yarn, Application Master shares the token. 
> 
> I think incase of spark stand alone the token is not shared to executor, any 
> example how to get the HDFS token for executor?
> 
>> On Fri, Jan 8, 2021 at 12:13 PM Gabor Somogyi  
>> wrote:
>> TGT is not enough, you need HDFS token which can be obtained by Spark. 
>> Please check the logs...
>> 
>>> On Fri, 8 Jan 2021, 18:51 Sudhir Babu Pothineni,  
>>> wrote:
>>> I spin up a spark standalone cluster (spark.autheticate=false), submitted a 
>>> job which reads remote kerberized HDFS, 
>>> 
>>> val spark = SparkSession.builder()
>>>   .master("spark://spark-standalone:7077")
>>>   .getOrCreate()
>>> 
>>> UserGroupInformation.loginUserFromKeytab(principal, keytab)
>>> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>>> 
>>> Ran into following exception:
>>> 
>>> Caused by:
>>> java.io.IOException: java.io.IOException: Failed on local exception: 
>>> java.io.IOException: org.apache.hadoop.security.AccessControlException: 
>>> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host 
>>> is: "..."; destination host is: "...":10346; 
>>> 
>>> 
>>> Any suggestions?
>>> 
>>> Thanks
>>> Sudhir


Re: Spark standalone - reading kerberos hdfs

2021-01-08 Thread Sudhir Babu Pothineni
Incase of Spark on Yarn, Application Master shares the token.

I think incase of spark stand alone the token is not shared to executor,
any example how to get the HDFS token for executor?

On Fri, Jan 8, 2021 at 12:13 PM Gabor Somogyi 
wrote:

> TGT is not enough, you need HDFS token which can be obtained by Spark.
> Please check the logs...
>
> On Fri, 8 Jan 2021, 18:51 Sudhir Babu Pothineni, 
> wrote:
>
>> I spin up a spark standalone cluster (spark.autheticate=false), submitted
>> a job which reads remote kerberized HDFS,
>>
>> val spark = SparkSession.builder()
>>   .master("spark://spark-standalone:7077")
>>   .getOrCreate()
>>
>> UserGroupInformation.loginUserFromKeytab(principal, keytab)
>> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>>
>> Ran into following exception:
>>
>> Caused by:
>> java.io.IOException: java.io.IOException: Failed on local exception:
>> java.io.IOException: org.apache.hadoop.security.AccessControlException:
>> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
>> is: "..."; destination host is: "...":10346;
>>
>>
>> Any suggestions?
>>
>> Thanks
>> Sudhir
>>
>


Re: Spark standalone - reading kerberos hdfs

2021-01-08 Thread Gabor Somogyi
TGT is not enough, you need HDFS token which can be obtained by Spark.
Please check the logs...

On Fri, 8 Jan 2021, 18:51 Sudhir Babu Pothineni, 
wrote:

> I spin up a spark standalone cluster (spark.autheticate=false), submitted
> a job which reads remote kerberized HDFS,
>
> val spark = SparkSession.builder()
>   .master("spark://spark-standalone:7077")
>   .getOrCreate()
>
> UserGroupInformation.loginUserFromKeytab(principal, keytab)
> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>
> Ran into following exception:
>
> Caused by:
> java.io.IOException: java.io.IOException: Failed on local exception:
> java.io.IOException: org.apache.hadoop.security.AccessControlException:
> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
> is: "..."; destination host is: "...":10346;
>
>
> Any suggestions?
>
> Thanks
> Sudhir
>


Spark standalone - reading kerberos hdfs

2021-01-08 Thread Sudhir Babu Pothineni
I spin up a spark standalone cluster (spark.autheticate=false), submitted a
job which reads remote kerberized HDFS,

val spark = SparkSession.builder()
  .master("spark://spark-standalone:7077")
  .getOrCreate()

UserGroupInformation.loginUserFromKeytab(principal, keytab)
val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")

Ran into following exception:

Caused by:
java.io.IOException: java.io.IOException: Failed on local exception:
java.io.IOException: org.apache.hadoop.security.AccessControlException:
Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
is: "..."; destination host is: "...":10346;


Any suggestions?

Thanks
Sudhir


RE: Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread Loic DESCOTTE
Everything is working fine now 
Thanks again

Loïc

De : German Schiavon 
Envoyé : mercredi 16 décembre 2020 19:23
À : Loic DESCOTTE 
Cc : user@spark.apache.org 
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

We all been there! no reason to be ashamed :)

On Wed, 16 Dec 2020 at 18:14, Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
Oh thank you you're right!! I feel shameful 


De : German Schiavon mailto:gschiavonsp...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 18:01
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Cc : user@spark.apache.org<mailto:user@spark.apache.org> 
mailto:user@spark.apache.org>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

Hi,

seems that you have a typo no?

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds

  
data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")


On Wed, 16 Dec 2020 at 17:02, Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
So I've tried several other things, including building a fat jar with hdfs 
dependency inside my app jar, and added this to the Spark configuration in the 
code :

val spark = SparkSession
  .builder()
  .appName("Hello Spark 7")
  .config("fs.hdfs.impl", 
classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  .getOrCreate()


But still the same error...


De : Sean Owen mailto:sro...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 14:27
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

I think it'll have to be part of the Spark distro, but I'm not 100% sure. I 
also think these get registered via manifest files in the JARs; if some process 
is stripping those when creating a bundled up JAR, could be it. Could be that 
it's failing to initialize too for some reason.

On Wed, Dec 16, 2020 at 7:24 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
I've tried with this spark-submit option :

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But it did't solve the issue.
Should I add more jars?

Thanks
Loïc

De : Sean Owen mailto:sro...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 14:20
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

Seems like your Spark cluster doesn't somehow have the Hadoop JARs?

On Wed, Dec 16, 2020 at 6:45 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
Hello,

I am using Spark On Kubernetes and I have the following error when I try to 
write data on HDFS : "no filesystem for scheme hdfs"

More details :

I am submitting my application with Spark submit like this :

spark-submit --master k8s://https://myK8SMaster:6443 \
--deploy-mode cluster \
--name hello-spark \
--class Hello \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=gradiant/spark:2.4.4 
hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar

Then the driver and the 2 executors are created in K8S.

But it fails when I look at the logs of the driver, I see this :

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at Hello$.main(hello.scala:24)
at Hello.main(hello.scala)


As you can see , my application jar helloSpark.jar file is correctly loaded on 
HDFS by the Spark submit, but writing to HDFS fails.

I have also tried to add the hadoop client dand hdfs dependencies in the spark 
submit command:

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:

Re: Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread German Schiavon
We all been there! no reason to be ashamed :)

On Wed, 16 Dec 2020 at 18:14, Loic DESCOTTE <
loic.desco...@kaizen-solutions.net> wrote:

> Oh thank you you're right!! I feel shameful 
>
> --
> *De :* German Schiavon 
> *Envoyé :* mercredi 16 décembre 2020 18:01
> *À :* Loic DESCOTTE 
> *Cc :* user@spark.apache.org 
> *Objet :* Re: Spark on Kubernetes : unable to write files to HDFS
>
> Hi,
>
> seems that you have a typo no?
>
> Exception in thread "main" java.io.IOException: No FileSystem for scheme:
> hfds
>
>   data.write.mode("overwrite").format("text").save("hfds://
> hdfs-namenode/user/loic/result.txt")
>
>
> On Wed, 16 Dec 2020 at 17:02, Loic DESCOTTE <
> loic.desco...@kaizen-solutions.net> wrote:
>
> So I've tried several other things, including building a fat jar with hdfs
> dependency inside my app jar, and added this to the Spark configuration in
> the code :
>
> val spark = SparkSession
>   .builder()
>   .appName("Hello Spark 7")
>   .config("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.
> DistributedFileSystem].getName)
>   .getOrCreate()
>
>
> But still the same error...
>
> --
> *De :* Sean Owen 
> *Envoyé :* mercredi 16 décembre 2020 14:27
> *À :* Loic DESCOTTE 
> *Objet :* Re: Spark on Kubernetes : unable to write files to HDFS
>
> I think it'll have to be part of the Spark distro, but I'm not 100% sure.
> I also think these get registered via manifest files in the JARs; if some
> process is stripping those when creating a bundled up JAR, could be it.
> Could be that it's failing to initialize too for some reason.
>
> On Wed, Dec 16, 2020 at 7:24 AM Loic DESCOTTE <
> loic.desco...@kaizen-solutions.net> wrote:
>
> I've tried with this spark-submit option :
>
> --packages
> org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \
>
> But it did't solve the issue.
> Should I add more jars?
>
> Thanks
> Loïc
> --
> *De :* Sean Owen 
> *Envoyé :* mercredi 16 décembre 2020 14:20
> *À :* Loic DESCOTTE 
> *Objet :* Re: Spark on Kubernetes : unable to write files to HDFS
>
> Seems like your Spark cluster doesn't somehow have the Hadoop JARs?
>
> On Wed, Dec 16, 2020 at 6:45 AM Loic DESCOTTE <
> loic.desco...@kaizen-solutions.net> wrote:
>
> Hello,
>
> I am using Spark On Kubernetes and I have the following error when I try
> to write data on HDFS : "no filesystem for scheme hdfs"
>
> More details :
>
> I am submitting my application with Spark submit like this :
>
> spark-submit --master k8s://https://myK8SMaster:6443 \
> --deploy-mode cluster \
> --name hello-spark \
> --class Hello \
> --conf spark.executor.instances=2 \
> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.container.image=gradiant/spark:2.4.4
> hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar
>
> Then the driver and the 2 executors are created in K8S.
>
> But it fails when I look at the logs of the driver, I see this :
>
> Exception in thread "main" java.io.IOException: No FileSystem for scheme:
> hfds
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
> at
> org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
> at
> org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at Hello$.main(hello.scala:24)
> at Hello.main(hello.scala)
>
>
> As you can see , my application jar helloSpark.jar file is correctly
> loaded on HDFS by the Spark submit, but writing to HDFS fails.
>
> I have also tried to add the hadoop client dand hdfs dependencies in the
> spark submit command:
>
> --packages
> org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \
&

RE: Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread Loic DESCOTTE
Oh thank you you're right!! I feel shameful ??


De : German Schiavon 
Envoyé : mercredi 16 décembre 2020 18:01
À : Loic DESCOTTE 
Cc : user@spark.apache.org 
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

Hi,

seems that you have a typo no?

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds

  
data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")


On Wed, 16 Dec 2020 at 17:02, Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
So I've tried several other things, including building a fat jar with hdfs 
dependency inside my app jar, and added this to the Spark configuration in the 
code :

val spark = SparkSession
  .builder()
  .appName("Hello Spark 7")
  .config("fs.hdfs.impl", 
classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  .getOrCreate()


But still the same error...


De : Sean Owen mailto:sro...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 14:27
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

I think it'll have to be part of the Spark distro, but I'm not 100% sure. I 
also think these get registered via manifest files in the JARs; if some process 
is stripping those when creating a bundled up JAR, could be it. Could be that 
it's failing to initialize too for some reason.

On Wed, Dec 16, 2020 at 7:24 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
I've tried with this spark-submit option :

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But it did't solve the issue.
Should I add more jars?

Thanks
Loïc

De : Sean Owen mailto:sro...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 14:20
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

Seems like your Spark cluster doesn't somehow have the Hadoop JARs?

On Wed, Dec 16, 2020 at 6:45 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
Hello,

I am using Spark On Kubernetes and I have the following error when I try to 
write data on HDFS : "no filesystem for scheme hdfs"

More details :

I am submitting my application with Spark submit like this :

spark-submit --master k8s://https://myK8SMaster:6443 \
--deploy-mode cluster \
--name hello-spark \
--class Hello \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=gradiant/spark:2.4.4 
hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar

Then the driver and the 2 executors are created in K8S.

But it fails when I look at the logs of the driver, I see this :

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at Hello$.main(hello.scala:24)
at Hello.main(hello.scala)


As you can see , my application jar helloSpark.jar file is correctly loaded on 
HDFS by the Spark submit, but writing to HDFS fails.

I have also tried to add the hadoop client dand hdfs dependencies in the spark 
submit command:

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But the error is still here.


Here is the Scala code of my application :


import java.util.Calendar

import org.apache.spark.sql.SparkSession

case class Data(singleField: String)

object Hello
{
def main(args: Array[String])
{

val spark = SparkSession
  .builder()
  .appName("Hello Spark")
  .getOrCreate()

import spark.implicits._

val now = Calendar.getInstance().getTime().toString
val data = List(Data(now)).toDF()

data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")
}
}

Thanks for your help,
Loïc


Re: Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread German Schiavon
Hi,

seems that you have a typo no?

Exception in thread "main" java.io.IOException: No FileSystem for scheme:
hfds

  data.write.mode("overwrite").format("text").save("hfds://
hdfs-namenode/user/loic/result.txt")


On Wed, 16 Dec 2020 at 17:02, Loic DESCOTTE <
loic.desco...@kaizen-solutions.net> wrote:

> So I've tried several other things, including building a fat jar with hdfs
> dependency inside my app jar, and added this to the Spark configuration in
> the code :
>
> val spark = SparkSession
>   .builder()
>   .appName("Hello Spark 7")
>   .config("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.
> DistributedFileSystem].getName)
>   .getOrCreate()
>
>
> But still the same error...
>
> --
> *De :* Sean Owen 
> *Envoyé :* mercredi 16 décembre 2020 14:27
> *À :* Loic DESCOTTE 
> *Objet :* Re: Spark on Kubernetes : unable to write files to HDFS
>
> I think it'll have to be part of the Spark distro, but I'm not 100% sure.
> I also think these get registered via manifest files in the JARs; if some
> process is stripping those when creating a bundled up JAR, could be it.
> Could be that it's failing to initialize too for some reason.
>
> On Wed, Dec 16, 2020 at 7:24 AM Loic DESCOTTE <
> loic.desco...@kaizen-solutions.net> wrote:
>
> I've tried with this spark-submit option :
>
> --packages
> org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \
>
> But it did't solve the issue.
> Should I add more jars?
>
> Thanks
> Loïc
> --
> *De :* Sean Owen 
> *Envoyé :* mercredi 16 décembre 2020 14:20
> *À :* Loic DESCOTTE 
> *Objet :* Re: Spark on Kubernetes : unable to write files to HDFS
>
> Seems like your Spark cluster doesn't somehow have the Hadoop JARs?
>
> On Wed, Dec 16, 2020 at 6:45 AM Loic DESCOTTE <
> loic.desco...@kaizen-solutions.net> wrote:
>
> Hello,
>
> I am using Spark On Kubernetes and I have the following error when I try
> to write data on HDFS : "no filesystem for scheme hdfs"
>
> More details :
>
> I am submitting my application with Spark submit like this :
>
> spark-submit --master k8s://https://myK8SMaster:6443 \
> --deploy-mode cluster \
> --name hello-spark \
> --class Hello \
> --conf spark.executor.instances=2 \
> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.container.image=gradiant/spark:2.4.4
> hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar
>
> Then the driver and the 2 executors are created in K8S.
>
> But it fails when I look at the logs of the driver, I see this :
>
> Exception in thread "main" java.io.IOException: No FileSystem for scheme:
> hfds
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
> at
> org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
> at
> org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at Hello$.main(hello.scala:24)
> at Hello.main(hello.scala)
>
>
> As you can see , my application jar helloSpark.jar file is correctly
> loaded on HDFS by the Spark submit, but writing to HDFS fails.
>
> I have also tried to add the hadoop client dand hdfs dependencies in the
> spark submit command:
>
> --packages
> org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \
>
> But the error is still here.
>
>
> Here is the Scala code of my application :
>
>
> import java.util.Calendar
>
> import org.apache.spark.sql.SparkSession
>
> case class Data(singleField: String)
>
> object Hello
> {
> def main(args: Array[String])
> {
>
> val spark = SparkSession
>   .builder()
>   .appName("Hello Spark")
>   .getOrCreate()
>
> import spark.implicits._
>
> val now = Calendar.getInstance().getTime().toString
> val data = List(Data(now)).toDF()
>
> data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")
> }
> }
>
> Thanks for your help,
> Loïc
>
>


RE: Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread Loic DESCOTTE
So I've tried several other things, including building a fat jar with hdfs 
dependency inside my app jar, and added this to the Spark configuration in the 
code :

val spark = SparkSession
  .builder()
  .appName("Hello Spark 7")
  .config("fs.hdfs.impl", 
classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
  .getOrCreate()


But still the same error...


De : Sean Owen 
Envoyé : mercredi 16 décembre 2020 14:27
À : Loic DESCOTTE 
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

I think it'll have to be part of the Spark distro, but I'm not 100% sure. I 
also think these get registered via manifest files in the JARs; if some process 
is stripping those when creating a bundled up JAR, could be it. Could be that 
it's failing to initialize too for some reason.

On Wed, Dec 16, 2020 at 7:24 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
I've tried with this spark-submit option :

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But it did't solve the issue.
Should I add more jars?

Thanks
Loïc

De : Sean Owen mailto:sro...@gmail.com>>
Envoyé : mercredi 16 décembre 2020 14:20
À : Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>>
Objet : Re: Spark on Kubernetes : unable to write files to HDFS

Seems like your Spark cluster doesn't somehow have the Hadoop JARs?

On Wed, Dec 16, 2020 at 6:45 AM Loic DESCOTTE 
mailto:loic.desco...@kaizen-solutions.net>> 
wrote:
Hello,

I am using Spark On Kubernetes and I have the following error when I try to 
write data on HDFS : "no filesystem for scheme hdfs"

More details :

I am submitting my application with Spark submit like this :

spark-submit --master k8s://https://myK8SMaster:6443 \
--deploy-mode cluster \
--name hello-spark \
--class Hello \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=gradiant/spark:2.4.4 
hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar

Then the driver and the 2 executors are created in K8S.

But it fails when I look at the logs of the driver, I see this :

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at Hello$.main(hello.scala:24)
at Hello.main(hello.scala)


As you can see , my application jar helloSpark.jar file is correctly loaded on 
HDFS by the Spark submit, but writing to HDFS fails.

I have also tried to add the hadoop client dand hdfs dependencies in the spark 
submit command:

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But the error is still here.


Here is the Scala code of my application :


import java.util.Calendar

import org.apache.spark.sql.SparkSession

case class Data(singleField: String)

object Hello
{
def main(args: Array[String])
{

val spark = SparkSession
  .builder()
  .appName("Hello Spark")
  .getOrCreate()

import spark.implicits._

val now = Calendar.getInstance().getTime().toString
val data = List(Data(now)).toDF()

data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")
}
}

Thanks for your help,
Loïc


Spark on Kubernetes : unable to write files to HDFS

2020-12-16 Thread Loic DESCOTTE
Hello,

I am using Spark On Kubernetes and I have the following error when I try to 
write data on HDFS : "no filesystem for scheme hdfs"

More details :

I am submitting my application with Spark submit like this :

spark-submit --master k8s://https://myK8SMaster:6443 \
--deploy-mode cluster \
--name hello-spark \
--class Hello \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=gradiant/spark:2.4.4 
hdfs://hdfs-namenode/user/loic/jars/helloSpark.jar

Then the driver and the 2 executors are created in K8S.

But it fails when I look at the logs of the driver, I see this :

Exception in thread "main" java.io.IOException: No FileSystem for scheme: hfds
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:424)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at Hello$.main(hello.scala:24)
at Hello.main(hello.scala)


As you can see , my application jar helloSpark.jar file is correctly loaded on 
HDFS by the Spark submit, but writing to HDFS fails.

I have also tried to add the hadoop client dand hdfs dependencies in the spark 
submit command:

--packages 
org.apache.hadoop:hadoop-client:2.6.5,org.apache.hadoop:hadoop-hdfs:2.6.5 \

But the error is still here.


Here is the Scala code of my application :


import java.util.Calendar

import org.apache.spark.sql.SparkSession

case class Data(singleField: String)

object Hello
{
def main(args: Array[String])
{

val spark = SparkSession
  .builder()
  .appName("Hello Spark")
  .getOrCreate()

import spark.implicits._

val now = Calendar.getInstance().getTime().toString
val data = List(Data(now)).toDF()

data.write.mode("overwrite").format("text").save("hfds://hdfs-namenode/user/loic/result.txt")
}
}

Thanks for your help,
Loïc


Re: Kafka Topic to Parquet HDFS with Structured Streaming

2020-11-19 Thread AlbertoMarq
Hi Chetan
I'm having the exact same issue with spark structured streaming and kafka
trying to write to HDFS.
Can you please tell me how did you fixed it?
I'm ussing spark 3.0.1 and hadoop 3.3.0

Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
sorry, the mail title  is a little problematic. "How to disable or
replace .."

lec ssmi  于2020年10月14日周三 上午9:27写道:

> I have written a demo using spark3.0.0, and the location where the
> checkpoint file  is saved has been explicitly specified   like
>>
>> stream.option("checkpointLocation","file:///C:\\Users\\Administrator
>> \\Desktop\\test")
>
> But the app still throws an   exception about the HDFS file system.
> Is it not possible to specify the local file system as a checkpoint
> location now?
>


how to disable replace HDFS checkpoint location in structured streaming in spark3.0.1

2020-10-13 Thread lec ssmi
I have written a demo using spark3.0.0, and the location where the
checkpoint file  is saved has been explicitly specified   like
>
> stream.option("checkpointLocation","file:///C:\\Users\\Administrator\\
> Desktop\\test")

But the app still throws an   exception about the HDFS file system.
Is it not possible to specify the local file system as a checkpoint
location now?


Re: Spark3 on k8S reading encrypted data from HDFS with KMS in HA

2020-08-19 Thread Michel Sumbul
Hi Prashant,

I have the problem only on K8S, it's working fine when spark is executed on
top of yarn.
I'm asking myself if the delegation gets saved, any idea how to check that?
Could it be because kms is in HA and spark request 2 delegation token?

For the testing,  just running spark3 on top of any k8s cluster reading
data to any hadoop3 with kms should be fine. I'm using a HDP3 cluster, but
there is probably a more easy way to test.

Michel

Le mer. 19 août 2020 à 09:50, Prashant Sharma  a
écrit :

> -dev
> Hi,
>
> I have used Spark with HDFS encrypted with Hadoop KMS, and it worked well.
> Somehow, I could not recall, if I had the kubernetes in the mix. Somehow,
> seeing the error, it is not clear what caused the failure. Can I reproduce
> this somehow?
>
> Thanks,
>
> On Sat, Aug 15, 2020 at 7:18 PM Michel Sumbul 
> wrote:
>
>> Hi guys,
>>
>> Does anyone have an idea on this issue? even some tips to troubleshoot it?
>> I got the impression that after the creation of the delegation for the
>> KMS, the token is not sent to the executor or maybe not saved?
>>
>> I'm sure I'm not the only one using Spark with HDFS encrypted with KMS :-)
>>
>> Thanks,
>> Michel
>>
>> Le jeu. 13 août 2020 à 14:32, Michel Sumbul  a
>> écrit :
>>
>>> Hi guys,
>>>
>>> Does anyone try Spark3 on k8s reading data from HDFS encrypted with KMS
>>> in HA mode (with kerberos)?
>>>
>>> I have a wordcount job running with Spark3 reading data on HDFS (hadoop
>>> 3.1) everything secure with kerberos. Everything works fine if the data
>>> folder is not encrypted (spark on k8s). If the data is on an encrypted
>>> folder, Spark3 on yarn is working fine but it doesn't work when Spark3 is
>>> running on K8S.
>>> I submit the job with spark-submit command and I provide the keytab and
>>> the principal to use.
>>> I got the kerberos error saying that there is no TGT to authenticate to
>>> the KMS (ranger kms, full stack trace of the error at the end of the mail)
>>> servers but in the log I can see that Spark get 2 delegation token, one for
>>> each KMS servers:
>>>
>>> -- --
>>>
>>> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Attempting to login
>>> to KDC using principal: mytestu...@paf.com
>>>
>>> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Successfully logged
>>> into KDC.
>>>
>>> 20/08/13 10:50:52 WARN DomainSocketFactory: The short-circuit local
>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>
>>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token
>>> for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
>>> mytestu...@paf.com (auth:KERBEROS)]] with renewer testuser
>>>
>>> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
>>> HDFS_DELEGATION_TOKEN owner= mytestu...@paf.com, renewer=testuser,
>>> realUser=, issueDate=1597315852353, maxDate=1597920652353,
>>> sequenceNumber=55185062, masterKeyId=1964 on ha-hdfs:cluster2
>>>
>>> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
>>> kms-dt, Service: kms://ht...@server2.paf.com:9393/kms, Ident: (kms-dt
>>> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852642,
>>> maxDate=1597920652642, sequenceNumber=3929883, masterKeyId=623))
>>>
>>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token
>>> for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
>>> testu...@paf.com (auth:KERBEROS)]] with renewer testu...@paf.com
>>>
>>> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
>>> HDFS_DELEGATION_TOKEN owner=testu...@paf.com, renewer=testuser,
>>> realUser=, issueDate=1597315852744, maxDate=1597920652744,
>>> sequenceNumber=55185063, masterKeyId=1964 on ha-hdfs:cluster2
>>>
>>> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
>>> kms-dt, Service: kms://ht...@server.paf.com:9393/kms, Ident: (kms-dt
>>> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852839,
>>> maxDate=1597920652839, sequenceNumber=3929884, masterKeyId=624))
>>>
>>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
>>> is 86400104 for token HDFS_DELEGATION_TOKEN
>>>
>>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
>>> is 86400108 for token kms-dt
>>>
>>

Re: Spark3 on k8S reading encrypted data from HDFS with KMS in HA

2020-08-19 Thread Prashant Sharma
-dev
Hi,

I have used Spark with HDFS encrypted with Hadoop KMS, and it worked well.
Somehow, I could not recall, if I had the kubernetes in the mix. Somehow,
seeing the error, it is not clear what caused the failure. Can I reproduce
this somehow?

Thanks,

On Sat, Aug 15, 2020 at 7:18 PM Michel Sumbul 
wrote:

> Hi guys,
>
> Does anyone have an idea on this issue? even some tips to troubleshoot it?
> I got the impression that after the creation of the delegation for the
> KMS, the token is not sent to the executor or maybe not saved?
>
> I'm sure I'm not the only one using Spark with HDFS encrypted with KMS :-)
>
> Thanks,
> Michel
>
> Le jeu. 13 août 2020 à 14:32, Michel Sumbul  a
> écrit :
>
>> Hi guys,
>>
>> Does anyone try Spark3 on k8s reading data from HDFS encrypted with KMS
>> in HA mode (with kerberos)?
>>
>> I have a wordcount job running with Spark3 reading data on HDFS (hadoop
>> 3.1) everything secure with kerberos. Everything works fine if the data
>> folder is not encrypted (spark on k8s). If the data is on an encrypted
>> folder, Spark3 on yarn is working fine but it doesn't work when Spark3 is
>> running on K8S.
>> I submit the job with spark-submit command and I provide the keytab and
>> the principal to use.
>> I got the kerberos error saying that there is no TGT to authenticate to
>> the KMS (ranger kms, full stack trace of the error at the end of the mail)
>> servers but in the log I can see that Spark get 2 delegation token, one for
>> each KMS servers:
>>
>> -- --
>>
>> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Attempting to login
>> to KDC using principal: mytestu...@paf.com
>>
>> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Successfully logged
>> into KDC.
>>
>> 20/08/13 10:50:52 WARN DomainSocketFactory: The short-circuit local reads
>> feature cannot be used because libhadoop cannot be loaded.
>>
>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token
>> for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
>> mytestu...@paf.com (auth:KERBEROS)]] with renewer testuser
>>
>> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
>> HDFS_DELEGATION_TOKEN owner= mytestu...@paf.com, renewer=testuser,
>> realUser=, issueDate=1597315852353, maxDate=1597920652353,
>> sequenceNumber=55185062, masterKeyId=1964 on ha-hdfs:cluster2
>>
>> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
>> kms-dt, Service: kms://ht...@server2.paf.com:9393/kms, Ident: (kms-dt
>> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852642,
>> maxDate=1597920652642, sequenceNumber=3929883, masterKeyId=623))
>>
>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token
>> for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
>> testu...@paf.com (auth:KERBEROS)]] with renewer testu...@paf.com
>>
>> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
>> HDFS_DELEGATION_TOKEN owner=testu...@paf.com, renewer=testuser,
>> realUser=, issueDate=1597315852744, maxDate=1597920652744,
>> sequenceNumber=55185063, masterKeyId=1964 on ha-hdfs:cluster2
>>
>> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
>> kms-dt, Service: kms://ht...@server.paf.com:9393/kms, Ident: (kms-dt
>> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852839,
>> maxDate=1597920652839, sequenceNumber=3929884, masterKeyId=624))
>>
>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
>> is 86400104 for token HDFS_DELEGATION_TOKEN
>>
>> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
>> is 86400108 for token kms-dt
>>
>> 20/08/13 10:50:54 INFO HiveConf: Found configuration file null
>>
>> 20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Scheduling renewal
>> in 18.0 h.
>>
>> 20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Updating delegation
>> tokens.
>>
>> 20/08/13 10:50:54 INFO SparkHadoopUtil: Updating delegation tokens for
>> current user.
>>
>> 20/08/13 10:50:55 INFO SparkHadoopUtil: Updating delegation tokens for
>> current user.
>> --- --
>>
>> In the core-site.xml, I have the following property for the 2 kms server
>>
>> --
>>
>> hadoop.security.key.provider.path
>>
>> kms://ht...@server.paf.com;server2.paf.com:9393/kms
>>
>> -
>>
>>
>>

Re: Spark3 on k8S reading encrypted data from HDFS with KMS in HA

2020-08-15 Thread Michel Sumbul
Hi guys,

Does anyone have an idea on this issue? even some tips to troubleshoot it?
I got the impression that after the creation of the delegation for the KMS,
the token is not sent to the executor or maybe not saved?

I'm sure I'm not the only one using Spark with HDFS encrypted with KMS :-)

Thanks,
Michel

Le jeu. 13 août 2020 à 14:32, Michel Sumbul  a
écrit :

> Hi guys,
>
> Does anyone try Spark3 on k8s reading data from HDFS encrypted with KMS in
> HA mode (with kerberos)?
>
> I have a wordcount job running with Spark3 reading data on HDFS (hadoop
> 3.1) everything secure with kerberos. Everything works fine if the data
> folder is not encrypted (spark on k8s). If the data is on an encrypted
> folder, Spark3 on yarn is working fine but it doesn't work when Spark3 is
> running on K8S.
> I submit the job with spark-submit command and I provide the keytab and
> the principal to use.
> I got the kerberos error saying that there is no TGT to authenticate to
> the KMS (ranger kms, full stack trace of the error at the end of the mail)
> servers but in the log I can see that Spark get 2 delegation token, one for
> each KMS servers:
>
> -- --
>
> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Attempting to login
> to KDC using principal: mytestu...@paf.com
>
> 20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Successfully logged
> into KDC.
>
> 20/08/13 10:50:52 WARN DomainSocketFactory: The short-circuit local reads
> feature cannot be used because libhadoop cannot be loaded.
>
> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token for:
> DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
> mytestu...@paf.com (auth:KERBEROS)]] with renewer testuser
>
> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
> HDFS_DELEGATION_TOKEN owner= mytestu...@paf.com, renewer=testuser,
> realUser=, issueDate=1597315852353, maxDate=1597920652353,
> sequenceNumber=55185062, masterKeyId=1964 on ha-hdfs:cluster2
>
> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
> kms-dt, Service: kms://ht...@server2.paf.com:9393/kms, Ident: (kms-dt
> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852642,
> maxDate=1597920652642, sequenceNumber=3929883, masterKeyId=623))
>
> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token for:
> DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
> testu...@paf.com (auth:KERBEROS)]] with renewer testu...@paf.com
>
> 20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
> HDFS_DELEGATION_TOKEN owner=testu...@paf.com, renewer=testuser,
> realUser=, issueDate=1597315852744, maxDate=1597920652744,
> sequenceNumber=55185063, masterKeyId=1964 on ha-hdfs:cluster2
>
> 20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind:
> kms-dt, Service: kms://ht...@server.paf.com:9393/kms, Ident: (kms-dt
> owner=testuser, renewer=testuser, realUser=, issueDate=1597315852839,
> maxDate=1597920652839, sequenceNumber=3929884, masterKeyId=624))
>
> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
> is 86400104 for token HDFS_DELEGATION_TOKEN
>
> 20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval
> is 86400108 for token kms-dt
>
> 20/08/13 10:50:54 INFO HiveConf: Found configuration file null
>
> 20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Scheduling renewal in
> 18.0 h.
>
> 20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Updating delegation
> tokens.
>
> 20/08/13 10:50:54 INFO SparkHadoopUtil: Updating delegation tokens for
> current user.
>
> 20/08/13 10:50:55 INFO SparkHadoopUtil: Updating delegation tokens for
> current user.
> --- --
>
> In the core-site.xml, I have the following property for the 2 kms server
>
> --
>
> hadoop.security.key.provider.path
>
> kms://ht...@server.paf.com;server2.paf.com:9393/kms
>
> -
>
>
> Does anyone have an idea how to make it work? Or at least anyone has been
> able to make it work?
> Does anyone know where the delegation tokens are saved during the
> execution of jobs on k8s and how it is shared between the executors?
>
>
> Thanks,
> Michel
>
> PS: The full stack trace of the error:
>
> 
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 22 in stage 0.0 failed 4 times, most recent failure: Lost
> task 22.3 in stage 0.0 (TID 23, 10.5.5.5, executor 1): java.io.IOException:
> org.apache.hadoop.security.authentication.client.AuthenticationException:
> Error while authenticating with end

Spark3 on k8S reading encrypted data from HDFS with KMS in HA

2020-08-13 Thread Michel Sumbul
Hi guys,

Does anyone try Spark3 on k8s reading data from HDFS encrypted with KMS in
HA mode (with kerberos)?

I have a wordcount job running with Spark3 reading data on HDFS (hadoop
3.1) everything secure with kerberos. Everything works fine if the data
folder is not encrypted (spark on k8s). If the data is on an encrypted
folder, Spark3 on yarn is working fine but it doesn't work when Spark3 is
running on K8S.
I submit the job with spark-submit command and I provide the keytab and the
principal to use.
I got the kerberos error saying that there is no TGT to authenticate to the
KMS (ranger kms, full stack trace of the error at the end of the mail)
servers but in the log I can see that Spark get 2 delegation token, one for
each KMS servers:

-- --

20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Attempting to login to
KDC using principal: mytestu...@paf.com

20/08/13 10:50:50 INFO HadoopDelegationTokenManager: Successfully logged
into KDC.

20/08/13 10:50:52 WARN DomainSocketFactory: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token for:
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
mytestu...@paf.com (auth:KERBEROS)]] with renewer testuser

20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
HDFS_DELEGATION_TOKEN owner= mytestu...@paf.com, renewer=testuser,
realUser=, issueDate=1597315852353, maxDate=1597920652353,
sequenceNumber=55185062, masterKeyId=1964 on ha-hdfs:cluster2

20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind: kms-dt,
Service: kms://ht...@server2.paf.com:9393/kms, Ident: (kms-dt
owner=testuser, renewer=testuser, realUser=, issueDate=1597315852642,
maxDate=1597920652642, sequenceNumber=3929883, masterKeyId=623))

20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: getting token for:
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-237056190_16, ugi=
testu...@paf.com (auth:KERBEROS)]] with renewer testu...@paf.com

20/08/13 10:50:52 INFO DFSClient: Created token for testuser:
HDFS_DELEGATION_TOKEN owner=testu...@paf.com, renewer=testuser, realUser=,
issueDate=1597315852744, maxDate=1597920652744, sequenceNumber=55185063,
masterKeyId=1964 on ha-hdfs:cluster2

20/08/13 10:50:52 INFO KMSClientProvider: New token created: (Kind: kms-dt,
Service: kms://ht...@server.paf.com:9393/kms, Ident: (kms-dt
owner=testuser, renewer=testuser, realUser=, issueDate=1597315852839,
maxDate=1597920652839, sequenceNumber=3929884, masterKeyId=624))

20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval is
86400104 for token HDFS_DELEGATION_TOKEN

20/08/13 10:50:52 INFO HadoopFSDelegationTokenProvider: Renewal interval is
86400108 for token kms-dt

20/08/13 10:50:54 INFO HiveConf: Found configuration file null

20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Scheduling renewal in
18.0 h.

20/08/13 10:50:54 INFO HadoopDelegationTokenManager: Updating delegation
tokens.

20/08/13 10:50:54 INFO SparkHadoopUtil: Updating delegation tokens for
current user.

20/08/13 10:50:55 INFO SparkHadoopUtil: Updating delegation tokens for
current user.
--- --

In the core-site.xml, I have the following property for the 2 kms server

--

hadoop.security.key.provider.path

kms://ht...@server.paf.com;server2.paf.com:9393/kms

-


Does anyone have an idea how to make it work? Or at least anyone has been
able to make it work?
Does anyone know where the delegation tokens are saved during the execution
of jobs on k8s and how it is shared between the executors?


Thanks,
Michel

PS: The full stack trace of the error:



Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 22 in stage 0.0 failed 4 times, most recent failure: Lost
task 22.3 in stage 0.0 (TID 23, 10.5.5.5, executor 1): java.io.IOException:
org.apache.hadoop.security.authentication.client.AuthenticationException:
Error while authenticating with endpoint:
https://server.paf.com:9393/kms/v1/keyversion/dir_tmp_key%400/_eek?eek_op=decrypt

at
org.apache.hadoop.crypto.key.kms.KMSClientProvider.createConnection(KMSClientProvider.java:525)

at
org.apache.hadoop.crypto.key.kms.KMSClientProvider.decryptEncryptedKey(KMSClientProvider.java:826)

at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider$5.call(LoadBalancingKMSClientProvider.java:351)

at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider$5.call(LoadBalancingKMSClientProvider.java:347)

at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.doOp(LoadBalancingKMSClientProvider.java:172)

at
org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.decryptEncryptedKey(LoadBalancingKMSClientProvider.java:347

Write to same hdfs dir from multiple spark jobs

2020-07-29 Thread Deepak Sharma
Hi
Is there any design pattern around writing to the same hdfs directory from
multiple spark jobs?

-- 
Thanks
Deepak
www.bigdatabig.com


Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-20 Thread DB Tsai
In Spark 3.0, if you use the `with-hadoop` Spark distribution that has
embedded Hadoop 3.2, you can set
`spark.yarn.populateHadoopClasspath=false` to not populate the
cluster's hadoop classpath. In this scenario, Spark will use hadoop
3.2 client to connect to hadoop 2.6 which should work fine. In fact,
we have production deployment using this way for a while.

On Sun, Jul 19, 2020 at 8:10 PM Ashika Umanga  wrote:
>
> Greetings,
>
> Hadoop 2.6 has been removed according to this ticket 
> https://issues.apache.org/jira/browse/SPARK-25016
>
> We run our Spark cluster on K8s in standalone mode.
> We access HDFS/Hive running on a Hadoop 2.6 cluster.
> We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> However, we dont have any control over the Hadoop cluster and it will remain 
> in 2.6
>
> Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
>
> Best Regards,



-- 
Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 42E5B25A8F7A82C1

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-20 Thread DB Tsai
If it's standalone mode, it's even easier. You should be able to
connect to hadoop 2.6 hdfs using 3.2 client. In your k8s cluster, just
don't put hadoop 2.6 into your classpath.

On Sun, Jul 19, 2020 at 10:25 PM Ashika Umanga Umagiliya
 wrote:
>
> Hello
>
> "spark.yarn.populateHadoopClasspath" is used in YARN mode correct?
> However our Spark cluster is standalone cluster not using YARN.
> We only connect to HDFS/Hive to access data.Computation is done on our spark 
> cluster running on K8s (not Yarn)
>
>
> On Mon, Jul 20, 2020 at 2:04 PM DB Tsai  wrote:
>>
>> In Spark 3.0, if you use the `with-hadoop` Spark distribution that has
>> embedded Hadoop 3.2, you can set
>> `spark.yarn.populateHadoopClasspath=false` to not populate the
>> cluster's hadoop classpath. In this scenario, Spark will use hadoop
>> 3.2 client to connect to hadoop 2.6 which should work fine. In fact,
>> we have production deployment using this way for a while.
>>
>> On Sun, Jul 19, 2020 at 8:10 PM Ashika Umanga  
>> wrote:
>> >
>> > Greetings,
>> >
>> > Hadoop 2.6 has been removed according to this ticket 
>> > https://issues.apache.org/jira/browse/SPARK-25016
>> >
>> > We run our Spark cluster on K8s in standalone mode.
>> > We access HDFS/Hive running on a Hadoop 2.6 cluster.
>> > We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
>> > However, we dont have any control over the Hadoop cluster and it will 
>> > remain in 2.6
>> >
>> > Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
>> >
>> > Best Regards,
>>
>>
>>
>> --
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 42E5B25A8F7A82C1
>
>
>
> --
> Umanga
> http://jp.linkedin.com/in/umanga
> http://umanga.ifreepages.com



-- 
Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 42E5B25A8F7A82C1

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Ashika Umanga Umagiliya
Hello

"spark.yarn.populateHadoopClasspath" is used in YARN mode correct?
However our Spark cluster is standalone cluster not using YARN.
We only connect to HDFS/Hive to access data.Computation is done on our
spark cluster running on K8s (not Yarn)


On Mon, Jul 20, 2020 at 2:04 PM DB Tsai  wrote:

> In Spark 3.0, if you use the `with-hadoop` Spark distribution that has
> embedded Hadoop 3.2, you can set
> `spark.yarn.populateHadoopClasspath=false` to not populate the
> cluster's hadoop classpath. In this scenario, Spark will use hadoop
> 3.2 client to connect to hadoop 2.6 which should work fine. In fact,
> we have production deployment using this way for a while.
>
> On Sun, Jul 19, 2020 at 8:10 PM Ashika Umanga 
> wrote:
> >
> > Greetings,
> >
> > Hadoop 2.6 has been removed according to this ticket
> https://issues.apache.org/jira/browse/SPARK-25016
> >
> > We run our Spark cluster on K8s in standalone mode.
> > We access HDFS/Hive running on a Hadoop 2.6 cluster.
> > We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> > However, we dont have any control over the Hadoop cluster and it will
> remain in 2.6
> >
> > Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
> >
> > Best Regards,
>
>
>
> --
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 42E5B25A8F7A82C1
>


-- 
Umanga
http://jp.linkedin.com/in/umanga
http://umanga.ifreepages.com


Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Prashant Sharma
Hi Ashika,

Hadoop 2.6 is now no longer supported, and since it has not been maintained
in the last 2 years, it means it may have some security issues unpatched.
Spark 3.0 onwards, we no longer support it, in other words, we have
modified our codebase in a way that Hadoop 2.6 won't work. However, if you
are determined, you can always apply a custom patch to spark codebase and
support it. I would recommend moving to newer Hadoop.

Thanks,

On Mon, Jul 20, 2020 at 8:41 AM Ashika Umanga 
wrote:

> Greetings,
>
> Hadoop 2.6 has been removed according to this ticket
> https://issues.apache.org/jira/browse/SPARK-25016
>
> We run our Spark cluster on K8s in standalone mode.
> We access HDFS/Hive running on a Hadoop 2.6 cluster.
> We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> However, we dont have any control over the Hadoop cluster and it will
> remain in 2.6
>
> Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
>
> Best Regards,
>


Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Ashika Umanga
Greetings,

Hadoop 2.6 has been removed according to this ticket
https://issues.apache.org/jira/browse/SPARK-25016

We run our Spark cluster on K8s in standalone mode.
We access HDFS/Hive running on a Hadoop 2.6 cluster.
We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
However, we dont have any control over the Hadoop cluster and it will
remain in 2.6

Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?

Best Regards,


Re: Spark dataframe hdfs vs s3

2020-05-30 Thread Anwar AliKhan
Learning, for example, require a large number of computationally
intensive operation loops and store the intermediate results in memory.

The operation of the Executor memory
<https://www.tutorialdocs.com/article/spark-memory-management.html> has two
main parts concerning storage and execution. Thanks to the *Unified Memory
Manager* mechanism, memory-storage and run-time memory share the same
space, allowing one to occupy the unused resources of the other.

   - The first is for storing data in the cache when using, for example,
   .cache() or broadcast().
   - The other part (execution) is used to store the temporary results of
   *shuffle*, *join*, aggregation, etc. processes.

Memory allocation to Executors is closely related to CPU allocation: one
core performs a task on one partition, so if an Executor has 4 cores, it
must have the capacity to store all 4 partitions as well as intermediate
results, metadata, etc… Thus, the user has to fix the amount of memory and
cores allocated to each Executor according to the application he wants to
process and the source file: a file is partitioned by default according to
the total number of cores in the cluster.

This link
<https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/>
lists
various best practices for cluster use and configuration. The following
diagram, taken from the previous link, gives an overview of how the memory
of an Executor works. An important note is that the memory allocated to an
Executor will always be higher than the specified value due to the
memoryOverhead which defaults to 10% of the specified value.

[image: Spark memory]
<https://www.adaltas.com/static/db3f761e83688dadf5b5f3ccad2bbd3f/d48f1/sparkExecMemory.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#how-a-spark-application-works>How
a Spark Application Works

In a multi-user cluster, the resources available to each user are not
unlimited. They are constrained to a given amount of memory, CPU and
storage space, in order to avoid monopolization of resources by a limited
number of users. These allocation rules are defined and managed by the
cluster administrator in charge of its operation.

In the case of Apache YARN, these resources can be allocated by *file*.
This means that a user may only be allowed to submit applications in a
single YARN queue in which the amount of resources available is constrained
by a maximum memory and CPU size.

The components and their resources used by a Spark application are
configurable via:

   - the spark-submit command using the arguments --executor-memory,
   --executor-cores, --num-executors, --driver-cores and --driver-memory
arguments.
   - the SparkSession object by configuring, for example,
.config("spark.executor.instances",
   "7") (see the scripts in the GitHub project
   <https://github.com/adaltas/spark-examples-resources/>).
   - the options in the spark-defaults.conf configuration file.

The user can also let the Spark decide how many Executors are needed
depending on the processing to be done via the following parameters:

spark = SparkSession.builder \
.appName("") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.executor.cores", "2") \
.config("spark.dynamicAllocation.minExecutors", "1") \
.config("spark.dynamicAllocation.maxExecutors", "5") \
.getOrCreate()

Thus, the application does not monopolize more resources than necessary in
a multi-user environment. More details are described in this article
explaining how Facebook adjusts Apache Spark for large-scale workloads
<https://towardsdatascience.com/how-does-facebook-tune-apache-spark-for-large-scale-workloads-3238ddda0830>
.

Regarding the underlying filesystem where data is stored, two optimization
rules
<https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html>
are
important:

   - Partition size should be at least 128MB and, if possible, based on a
   key attribute.
   - The number of CPUs/Executor should be between 4 and 6.

In the Spark application presented below, we will use the 2018 New York
Green Taxi dataset. The following script will download the file and save it
in HDFS:

# Download the datasetcurl
https://data.cityofnewyork.us/api/views/w7fs-fd9i/rows.csv?accessType=DOWNLOAD
\
  -o ~/trip_taxi_green2018.csv# Create a HDFS directory
hdfs dfs -mkdir ~/nyctrip# Load the dataset into HDFS
hdfs dfs -put \
  ~/trip_taxi_green2018.csv \
  ~/nyctrip/trip_taxi_green2018.csv \
  -D dfs.block.size=128M# Remove the original datasetrm
~/trip_taxi_green2018.csv

Our file of 793MB divided into 128MB blocks gives us 793/128 = 6.19 or 7
partitions.

If we ask for 7 Executors, they will have respectively in memory ~113MB.
Wit

Re: Spark dataframe hdfs vs s3

2020-05-30 Thread Dark Crusader
Thanks all for the replies.
I am switching to hdfs since it seems like an easier solution.
To answer some of your questions, my hdfs space is a part of my nodes I use
for computation on spark.
>From what I understand, this helps because of the data locality advantage.
Which means that there is less network IO and data redistribution on the
nodes.

Thanks for your help.
Aditya

On Sat, 30 May, 2020, 10:48 am Jörn Franke,  wrote:

> Maybe some aws network optimized instances with higher bandwidth will
> improve the situation.
>
> Am 27.05.2020 um 19:51 schrieb Dark Crusader  >:
>
> 
> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddra...@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>


Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Jörn Franke
Maybe some aws network optimized instances with higher bandwidth will improve 
the situation.

> Am 27.05.2020 um 19:51 schrieb Dark Crusader :
> 
> 
> Hi Jörn,
> 
> Thanks for the reply. I will try to create a easier example to reproduce the 
> issue.
> 
> I will also try your suggestion to look into the UI. Can you guide on what I 
> should be looking for? 
> 
> I was already using the s3a protocol to compare the times.
> 
> My hunch is that multiple reads from S3 are required because of improper 
> caching of intermediate data. And maybe hdfs is doing a better job at this. 
> Does this make sense?
> 
> I would also like to add that we built an extra layer on S3 which might be 
> adding to even slower times.
> 
> Thanks for your help.
> 
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>> Have you looked in Spark UI why this is the case ? 
>> S3 Reading can take more time - it depends also what s3 url you are using : 
>> s3a vs s3n vs S3.
>> 
>> It could help after some calculation to persist in-memory or on HDFS. You 
>> can also initially load from S3 and store on HDFS and work from there . 
>> 
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes 
>> where the data is. Depending on what s3 „protocol“ you are using you might 
>> be also more punished with performance.
>> 
>> Try s3a as a protocol (replace all s3n with s3a).
>> 
>> You can also use s3 url but this requires a special bucket configuration, a 
>> dedicated empty bucket and it lacks some ineroperability with other AWS 
>> services.
>> 
>> Nevertheless, it could be also something else with the code. Can you post an 
>> example reproducing the issue?
>> 
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader 
>> > :
>> > 
>> > 
>> > Hi all,
>> > 
>> > I am reading data from hdfs in the form of parquet files (around 3 GB) and 
>> > running an algorithm from the spark ml library.
>> > 
>> > If I create the same spark dataframe by reading data from S3, the same 
>> > algorithm takes considerably more time.
>> > 
>> > I don't understand why this is happening. Is this a chance occurence or 
>> > are the spark dataframes created different? 
>> > 
>> > I don't understand how the data store would effect the algorithm 
>> > performance.
>> > 
>> > Any help would be appreciated. Thanks a lot.


Re: Spark dataframe hdfs vs s3

2020-05-29 Thread randy clinton
HDFS is simply a better place to make performant reads and on top of that
the data is closer to your spark job. The databricks link from above will
show you that where they find a 6x read throughput difference between the
two.

If your HDFS is part of the same Spark cluster than it should be an
incredibly fast read vs reaching out to S3 for the data.

They are different types of storage solving different things.

Something I have seen in workflows is something other people have suggested
above, is a stage where you load data from S3 into HDFS, then move on to
you other work with it and maybe finally persist outside of HDFS.

On Fri, May 29, 2020 at 2:09 PM Bin Fan  wrote:

> Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
> similar HDFS interface?
> Like in this article:
>
> https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/
>
>
> On Wed, May 27, 2020 at 6:52 PM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi Randy,
>>
>> Yes, I'm using parquet on both S3 and hdfs.
>>
>> On Thu, 28 May, 2020, 2:38 am randy clinton, 
>> wrote:
>>
>>> Is the file Parquet on S3 or is it some other file format?
>>>
>>> In general I would assume that HDFS read/writes are more performant for
>>> spark jobs.
>>>
>>> For instance, consider how well partitioned your HDFS file is vs the S3
>>> file.
>>>
>>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>>> relinquisheddra...@gmail.com> wrote:
>>>
>>>> Hi Jörn,
>>>>
>>>> Thanks for the reply. I will try to create a easier example to
>>>> reproduce the issue.
>>>>
>>>> I will also try your suggestion to look into the UI. Can you guide on
>>>> what I should be looking for?
>>>>
>>>> I was already using the s3a protocol to compare the times.
>>>>
>>>> My hunch is that multiple reads from S3 are required because of
>>>> improper caching of intermediate data. And maybe hdfs is doing a better job
>>>> at this. Does this make sense?
>>>>
>>>> I would also like to add that we built an extra layer on S3 which might
>>>> be adding to even slower times.
>>>>
>>>> Thanks for your help.
>>>>
>>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>>> wrote:
>>>>
>>>>> Have you looked in Spark UI why this is the case ?
>>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>>> using : s3a vs s3n vs S3.
>>>>>
>>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>>> You can also initially load from S3 and store on HDFS and work from there 
>>>>> .
>>>>>
>>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>>> might be also more punished with performance.
>>>>>
>>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>>
>>>>> You can also use s3 url but this requires a special bucket
>>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>>> with other AWS services.
>>>>>
>>>>> Nevertheless, it could be also something else with the code. Can you
>>>>> post an example reproducing the issue?
>>>>>
>>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>>> relinquisheddra...@gmail.com>:
>>>>> >
>>>>> > 
>>>>> > Hi all,
>>>>> >
>>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>>> GB) and running an algorithm from the spark ml library.
>>>>> >
>>>>> > If I create the same spark dataframe by reading data from S3, the
>>>>> same algorithm takes considerably more time.
>>>>> >
>>>>> > I don't understand why this is happening. Is this a chance occurence
>>>>> or are the spark dataframes created different?
>>>>> >
>>>>> > I don't understand how the data store would effect the algorithm
>>>>> performance.
>>>>> >
>>>>> > Any help would be appreciated. Thanks a lot.
>>>>>
>>>>
>>>
>>> --
>>> I appreciate your time,
>>>
>>> ~Randy
>>>
>>

-- 
I appreciate your time,

~Randy


Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Bin Fan
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
similar HDFS interface?
Like in this article:
https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/


On Wed, May 27, 2020 at 6:52 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
>>>> Have you looked in Spark UI why this is the case ?
>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>> using : s3a vs s3n vs S3.
>>>>
>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>> You can also initially load from S3 and store on HDFS and work from there .
>>>>
>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>> might be also more punished with performance.
>>>>
>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>
>>>> You can also use s3 url but this requires a special bucket
>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>> with other AWS services.
>>>>
>>>> Nevertheless, it could be also something else with the code. Can you
>>>> post an example reproducing the issue?
>>>>
>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>> relinquisheddra...@gmail.com>:
>>>> >
>>>> > 
>>>> > Hi all,
>>>> >
>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>> GB) and running an algorithm from the spark ml library.
>>>> >
>>>> > If I create the same spark dataframe by reading data from S3, the
>>>> same algorithm takes considerably more time.
>>>> >
>>>> > I don't understand why this is happening. Is this a chance occurence
>>>> or are the spark dataframes created different?
>>>> >
>>>> > I don't understand how the data store would effect the algorithm
>>>> performance.
>>>> >
>>>> > Any help would be appreciated. Thanks a lot.
>>>>
>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: Spark dataframe hdfs vs s3

2020-05-28 Thread Kanwaljit Singh
You can’t play much if it is a streaming job. But in case of batch jobs, 
sometimes teams will copy their S3 data to HDFS in prep for the next run :D

From: randy clinton 
Date: Thursday, May 28, 2020 at 5:50 AM
To: Dark Crusader 
Cc: Jörn Franke , user 
Subject: Re: Spark dataframe hdfs vs s3

See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read throughput 
than S3. Thus, given that the S3 is 10x cheaper than HDFS, we find that S3 is 
almost 2x better compared to HDFS on performance per dollar."

https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html


On Wed, May 27, 2020, 9:51 PM Dark Crusader 
mailto:relinquisheddra...@gmail.com>> wrote:
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton, 
mailto:randyclin...@gmail.com>> wrote:
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for spark 
jobs.

For instance, consider how well partitioned your HDFS file is vs the S3 file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader 
mailto:relinquisheddra...@gmail.com>> wrote:
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce the 
issue.

I will also try your suggestion to look into the UI. Can you guide on what I 
should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper 
caching of intermediate data. And maybe hdfs is doing a better job at this. 
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be 
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
mailto:jornfra...@gmail.com>> wrote:
Have you looked in Spark UI why this is the case ?
S3 Reading can take more time - it depends also what s3 url you are using : s3a 
vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can 
also initially load from S3 and store on HDFS and work from there .

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where 
the data is. Depending on what s3 „protocol“ you are using you might be also 
more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a 
dedicated empty bucket and it lacks some ineroperability with other AWS 
services.

Nevertheless, it could be also something else with the code. Can you post an 
example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader 
> mailto:relinquisheddra...@gmail.com>>:
>
>
> Hi all,
>
> I am reading data from hdfs in the form of parquet files (around 3 GB) and 
> running an algorithm from the spark ml library.
>
> If I create the same spark dataframe by reading data from S3, the same 
> algorithm takes considerably more time.
>
> I don't understand why this is happening. Is this a chance occurence or are 
> the spark dataframes created different?
>
> I don't understand how the data store would effect the algorithm performance.
>
> Any help would be appreciated. Thanks a lot.


--
I appreciate your time,

~Randy


Re: Spark dataframe hdfs vs s3

2020-05-28 Thread randy clinton
See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read
throughput than S3. Thus, *given that the S3 is 10x cheaper than HDFS, we
find that S3 is almost 2x better compared to HDFS on performance per
dollar."*

*https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html
<https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html>*


On Wed, May 27, 2020, 9:51 PM Dark Crusader 
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, 
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, 
>>> wrote:
>>>
>>>> Have you looked in Spark UI why this is the case ?
>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>> using : s3a vs s3n vs S3.
>>>>
>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>> You can also initially load from S3 and store on HDFS and work from there .
>>>>
>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>> might be also more punished with performance.
>>>>
>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>
>>>> You can also use s3 url but this requires a special bucket
>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>> with other AWS services.
>>>>
>>>> Nevertheless, it could be also something else with the code. Can you
>>>> post an example reproducing the issue?
>>>>
>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>> relinquisheddra...@gmail.com>:
>>>> >
>>>> > 
>>>> > Hi all,
>>>> >
>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>> GB) and running an algorithm from the spark ml library.
>>>> >
>>>> > If I create the same spark dataframe by reading data from S3, the
>>>> same algorithm takes considerably more time.
>>>> >
>>>> > I don't understand why this is happening. Is this a chance occurence
>>>> or are the spark dataframes created different?
>>>> >
>>>> > I don't understand how the data store would effect the algorithm
>>>> performance.
>>>> >
>>>> > Any help would be appreciated. Thanks a lot.
>>>>
>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton,  wrote:

> Is the file Parquet on S3 or is it some other file format?
>
> In general I would assume that HDFS read/writes are more performant for
> spark jobs.
>
> For instance, consider how well partitioned your HDFS file is vs the S3
> file.
>
> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi Jörn,
>>
>> Thanks for the reply. I will try to create a easier example to reproduce
>> the issue.
>>
>> I will also try your suggestion to look into the UI. Can you guide on
>> what I should be looking for?
>>
>> I was already using the s3a protocol to compare the times.
>>
>> My hunch is that multiple reads from S3 are required because of improper
>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>> Does this make sense?
>>
>> I would also like to add that we built an extra layer on S3 which might
>> be adding to even slower times.
>>
>> Thanks for your help.
>>
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>>
>>> Have you looked in Spark UI why this is the case ?
>>> S3 Reading can take more time - it depends also what s3 url you are
>>> using : s3a vs s3n vs S3.
>>>
>>> It could help after some calculation to persist in-memory or on HDFS.
>>> You can also initially load from S3 and store on HDFS and work from there .
>>>
>>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>>> where the data is. Depending on what s3 „protocol“ you are using you might
>>> be also more punished with performance.
>>>
>>> Try s3a as a protocol (replace all s3n with s3a).
>>>
>>> You can also use s3 url but this requires a special bucket
>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>> with other AWS services.
>>>
>>> Nevertheless, it could be also something else with the code. Can you
>>> post an example reproducing the issue?
>>>
>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>> relinquisheddra...@gmail.com>:
>>> >
>>> > 
>>> > Hi all,
>>> >
>>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>>> and running an algorithm from the spark ml library.
>>> >
>>> > If I create the same spark dataframe by reading data from S3, the same
>>> algorithm takes considerably more time.
>>> >
>>> > I don't understand why this is happening. Is this a chance occurence
>>> or are the spark dataframes created different?
>>> >
>>> > I don't understand how the data store would effect the algorithm
>>> performance.
>>> >
>>> > Any help would be appreciated. Thanks a lot.
>>>
>>
>
> --
> I appreciate your time,
>
> ~Randy
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread randy clinton
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for
spark jobs.

For instance, consider how well partitioned your HDFS file is vs the S3
file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader 
wrote:

> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddra...@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>

-- 
I appreciate your time,

~Randy


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce
the issue.

I will also try your suggestion to look into the UI. Can you guide on what
I should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper
caching of intermediate data. And maybe hdfs is doing a better job at this.
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:

> Have you looked in Spark UI why this is the case ?
> S3 Reading can take more time - it depends also what s3 url you are using
> : s3a vs s3n vs S3.
>
> It could help after some calculation to persist in-memory or on HDFS. You
> can also initially load from S3 and store on HDFS and work from there .
>
> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
> where the data is. Depending on what s3 „protocol“ you are using you might
> be also more punished with performance.
>
> Try s3a as a protocol (replace all s3n with s3a).
>
> You can also use s3 url but this requires a special bucket configuration,
> a dedicated empty bucket and it lacks some ineroperability with other AWS
> services.
>
> Nevertheless, it could be also something else with the code. Can you post
> an example reproducing the issue?
>
> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
> relinquisheddra...@gmail.com>:
> >
> > 
> > Hi all,
> >
> > I am reading data from hdfs in the form of parquet files (around 3 GB)
> and running an algorithm from the spark ml library.
> >
> > If I create the same spark dataframe by reading data from S3, the same
> algorithm takes considerably more time.
> >
> > I don't understand why this is happening. Is this a chance occurence or
> are the spark dataframes created different?
> >
> > I don't understand how the data store would effect the algorithm
> performance.
> >
> > Any help would be appreciated. Thanks a lot.
>


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Jörn Franke
Have you looked in Spark UI why this is the case ? 
S3 Reading can take more time - it depends also what s3 url you are using : s3a 
vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can 
also initially load from S3 and store on HDFS and work from there . 

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where 
the data is. Depending on what s3 „protocol“ you are using you might be also 
more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a 
dedicated empty bucket and it lacks some ineroperability with other AWS 
services.

Nevertheless, it could be also something else with the code. Can you post an 
example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader :
> 
> 
> Hi all,
> 
> I am reading data from hdfs in the form of parquet files (around 3 GB) and 
> running an algorithm from the spark ml library.
> 
> If I create the same spark dataframe by reading data from S3, the same 
> algorithm takes considerably more time.
> 
> I don't understand why this is happening. Is this a chance occurence or are 
> the spark dataframes created different? 
> 
> I don't understand how the data store would effect the algorithm performance.
> 
> Any help would be appreciated. Thanks a lot.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark dataframe hdfs vs s3

2020-05-27 Thread Dark Crusader
Hi all,

I am reading data from hdfs in the form of parquet files (around 3 GB) and
running an algorithm from the spark ml library.

If I create the same spark dataframe by reading data from S3, the same
algorithm takes considerably more time.

I don't understand why this is happening. Is this a chance occurence or are
the spark dataframes created different?

I don't understand how the data store would effect the algorithm
performance.

Any help would be appreciated. Thanks a lot.


Re: How can I add extra mounted disk to HDFS

2020-04-30 Thread Chetan Khatri
I've 3 disks now
disk1- already have data
disk2- newly added

I want to shift the data from disk1 to disk2, obviously both are datanodes.
Please suggest the steps for hot data node disk migration.

On Wed, Apr 29, 2020 at 2:38 AM JB Data31  wrote:

> Use Hadoop NFSv3 gateway to mount FS.
>
> @*JB*Δ 
>
>
>
> Le mar. 28 avr. 2020 à 23:18, Chetan Khatri 
> a écrit :
>
>> Hi Spark Users,
>>
>> My spark job gave me an error No Space left on the device
>>
>


Re: How can I add extra mounted disk to HDFS

2020-04-29 Thread JB Data31
Use Hadoop NFSv3 gateway to mount FS.

@*JB*Δ 



Le mar. 28 avr. 2020 à 23:18, Chetan Khatri  a
écrit :

> Hi Spark Users,
>
> My spark job gave me an error No Space left on the device
>


How can I add extra mounted disk to HDFS

2020-04-28 Thread Chetan Khatri
Hi Spark Users,

My spark job gave me an error No Space left on the device


Re: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

2020-04-06 Thread jane thorpe

Hi Som,

HdfsWordCount program  counts words 
>From files you place in a  directory with the name of argv [args.length -1]  
>while the program is running in a for (;;)  loop until user press CTRL C. 

Why  does program name  have prefix of  HDFS   ? 
HADOOP distributed  FileSystem.

Is it a program which demonstrates 
HDFS  or  streaming.

I am really really  confused  with this  program ExceptionHandlingTest.

What exception handling is being tested,  JVM's throw new exception  syntax , 
if value greater than  0.75, 
 or is it some thing meant to be testing SPARK API exception handling.


spark.sparkContext.parallelize(0 until 
spark.sparkContext.defaultParallelism).foreach 
    {
 i => if (math.random > 0.75)
    { 
  throw new Exception("Testing exception handling") 
   }
 }


package org.apache.spark.examples

 import org.apache.spark.sql.SparkSession

 object ExceptionHandlingTest
 { 
def main(args: Array[String]): Unit =
 { 
val spark = SparkSession .builder .appName("ExceptionHandlingTest") 
.getOrCreate()

 spark.sparkContext.parallelize(0 until 
spark.sparkContext.defaultParallelism).foreach {
 i => if (math.random > 0.75) 
{ 
throw new Exception("Testing exception handling") 
} 
}

 spark.stop() }}


On Monday, 6 April 2020 Som Lima  wrote:
Ok Try this one instead. (link below) 
It has both  an EXIT which we know is  rude and abusive  instead of graceful 
structured programming and also includes half hearted  user input validation.
Do you think millions of spark users download and test these programmes and 
repeat this rude programming behaviour.
I don't think they have any coding rules like the safety critical software 
industry But they do have strict emailing rules.
Do you think email rules are far more important than programming rules and 
guidelines  ?

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala



On Mon, 6 Apr 2020, 07:04 jane thorpe,  wrote:

Hi Som ,
Did you know that simple demo program of reading characters from file didn't 
work ?
Who wrote that simple hello world type little program ?
 
jane thorpe
janethor...@aol.com
 
 
-Original Message-
From: jane thorpe 
To: somplasticllc ; user 
Sent: Fri, 3 Apr 2020 2:44
Subject: Re: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

 
Thanks darling
I tried this and worked 

hdfs getconf -confKey fs.defaultFS
hdfs://localhost:9000


scala> :paste
// Entering paste mode (ctrl-D to finish)

val textFile = 
sc.textFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README7.out")

// Exiting paste mode, now interpreting.

textFile: org.apache.spark.rdd.RDD[String] = 
hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt MapPartitionsRDD[91] at 
textFile at :27
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[94] at 
reduceByKey at :30

scala> :quit

 
jane thorpe
janethor...@aol.com
 
 
-Original Message-
From: Som Lima 
CC: user 
Sent: Tue, 31 Mar 2020 23:06
Subject: Re: HDFS file

Hi Jane
Try this example 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala

Som
On Tue, 31 Mar 2020, 21:34 jane thorpe,  wrote:

 hi,
Are there setup instructions on the website for 
spark-3.0.0-preview2-bin-hadoop2.7I can run same program for hdfs format
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

val textFile = sc.textFile("/data/README.md")val counts = textFile.flatMap(line 
=> line.split(" ")) .map(word => (word, 1)) 
.reduceByKey(_ + _)counts.saveAsTextFile("/data/wordcount")
textFile: org.apache.spark.rdd.RDD[String] = /data/README.md 
MapPartitionsRDD[23] at textFile at :28counts: 
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at 
:31


br
Jane 




Re: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

2020-04-06 Thread Som Lima
Ok Try this one instead. (link below)

It has both  an EXIT which we know is  rude and abusive  instead of
graceful structured programming and also includes half hearted  user input
validation.

Do you think millions of spark users download and test these programmes and
repeat this rude programming behaviour.

I don't think they have any coding rules like the safety critical software
industry
But they do have strict emailing rules.

Do you think email rules are far more important than programming rules and
guidelines  ?


https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala



On Mon, 6 Apr 2020, 07:04 jane thorpe,  wrote:

> Hi Som ,
>
> Did you know that simple demo program of reading characters from file
> didn't work ?
> Who wrote that simple hello world type little program ?
>
> jane thorpe
> janethor...@aol.com
>
>
> -Original Message-
> From: jane thorpe 
> To: somplasticllc ; user 
> Sent: Fri, 3 Apr 2020 2:44
> Subject: Re: HDFS file hdfs://
> 127.0.0.1:9000/hdfs/spark/examples/README.txt
>
>
> Thanks darling
>
> I tried this and worked
>
> hdfs getconf -confKey fs.defaultFS
> hdfs://localhost:9000
>
>
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> val textFile = sc.textFile("hdfs://
> 127.0.0.1:9000/hdfs/spark/examples/README.txt")
> val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
> counts.saveAsTextFile("hdfs://
> 127.0.0.1:9000/hdfs/spark/examples/README7.out")
>
> // Exiting paste mode, now interpreting.
>
> textFile: org.apache.spark.rdd.RDD[String] = hdfs://
> 127.0.0.1:9000/hdfs/spark/examples/README.txt MapPartitionsRDD[91] at
> textFile at :27
> counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[94] at
> reduceByKey at :30
>
> scala> :quit
>
>
> jane thorpe
> janethor...@aol.com
>
>
> -Original Message-
> From: Som Lima 
> CC: user 
> Sent: Tue, 31 Mar 2020 23:06
> Subject: Re: HDFS file
>
> Hi Jane
>
> Try this example
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
>
>
> Som
>
> On Tue, 31 Mar 2020, 21:34 jane thorpe, 
> wrote:
>
> hi,
>
> Are there setup instructions on the website for
> spark-3.0.0-preview2-bin-hadoop2.7
> I can run same program for hdfs format
>
> val textFile = sc.textFile("hdfs://...")val counts = textFile.flatMap(line => 
> line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)counts.saveAsTextFile("hdfs://...")
>
>
>
> val textFile = sc.textFile("/data/README.md")
> val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
> counts.saveAsTextFile("/data/wordcount")
>
> textFile: org.apache.spark.rdd.RDD[String] = /data/README.md
> MapPartitionsRDD[23] at textFile at :28
>
> counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at 
> reduceByKey at :31
>
> br
> Jane
>
>


Fwd: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

2020-04-06 Thread jane thorpe
Hi Som ,
Did you know that simple demo program of reading characters from file didn't 
work ?
Who wrote that simple hello world type little program ?
 
jane thorpe
janethor...@aol.com
 
 
-Original Message-
From: jane thorpe 
To: somplasticllc ; user 
Sent: Fri, 3 Apr 2020 2:44
Subject: Re: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

 
Thanks darling
I tried this and worked 

hdfs getconf -confKey fs.defaultFS
hdfs://localhost:9000


scala> :paste
// Entering paste mode (ctrl-D to finish)

val textFile = 
sc.textFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README7.out")

// Exiting paste mode, now interpreting.

textFile: org.apache.spark.rdd.RDD[String] = 
hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt MapPartitionsRDD[91] at 
textFile at :27
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[94] at 
reduceByKey at :30

scala> :quit

 
jane thorpe
janethor...@aol.com
 
 
-Original Message-
From: Som Lima 
CC: user 
Sent: Tue, 31 Mar 2020 23:06
Subject: Re: HDFS file

Hi Jane
Try this example 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala

Som
On Tue, 31 Mar 2020, 21:34 jane thorpe,  wrote:

 hi,
Are there setup instructions on the website for 
spark-3.0.0-preview2-bin-hadoop2.7I can run same program for hdfs format
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

val textFile = sc.textFile("/data/README.md")val counts = textFile.flatMap(line 
=> line.split(" ")) .map(word => (word, 1)) 
.reduceByKey(_ + _)counts.saveAsTextFile("/data/wordcount")
textFile: org.apache.spark.rdd.RDD[String] = /data/README.md 
MapPartitionsRDD[23] at textFile at :28counts: 
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at 
:31


br
Jane 



Re: HDFS file hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt

2020-04-02 Thread jane thorpe
 
Thanks darling
I tried this and worked 

hdfs getconf -confKey fs.defaultFS
hdfs://localhost:9000


scala> :paste
// Entering paste mode (ctrl-D to finish)

val textFile = 
sc.textFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://127.0.0.1:9000/hdfs/spark/examples/README7.out")

// Exiting paste mode, now interpreting.

textFile: org.apache.spark.rdd.RDD[String] = 
hdfs://127.0.0.1:9000/hdfs/spark/examples/README.txt MapPartitionsRDD[91] at 
textFile at :27
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[94] at 
reduceByKey at :30

scala> :quit

 
jane thorpe
janethor...@aol.com
 
 
-Original Message-
From: Som Lima 
CC: user 
Sent: Tue, 31 Mar 2020 23:06
Subject: Re: HDFS file

Hi Jane
Try this example 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala

Som
On Tue, 31 Mar 2020, 21:34 jane thorpe,  wrote:

 hi,
Are there setup instructions on the website for 
spark-3.0.0-preview2-bin-hadoop2.7I can run same program for hdfs format
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

val textFile = sc.textFile("/data/README.md")val counts = textFile.flatMap(line 
=> line.split(" ")) .map(word => (word, 1)) 
.reduceByKey(_ + _)counts.saveAsTextFile("/data/wordcount")
textFile: org.apache.spark.rdd.RDD[String] = /data/README.md 
MapPartitionsRDD[23] at textFile at :28counts: 
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at 
:31


br
Jane 



Re: HDFS file

2020-03-31 Thread Som Lima
Hi Jane

Try this example

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala


Som

On Tue, 31 Mar 2020, 21:34 jane thorpe,  wrote:

> hi,
>
> Are there setup instructions on the website for
> spark-3.0.0-preview2-bin-hadoop2.7
> I can run same program for hdfs format
>
> val textFile = sc.textFile("hdfs://...")val counts = textFile.flatMap(line => 
> line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)counts.saveAsTextFile("hdfs://...")
>
>
>
> val textFile = sc.textFile("/data/README.md")
> val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
> counts.saveAsTextFile("/data/wordcount")
>
> textFile: org.apache.spark.rdd.RDD[String] = /data/README.md
> MapPartitionsRDD[23] at textFile at :28
>
> counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at 
> reduceByKey at :31
>
> br
> Jane
>


HDFS file

2020-03-31 Thread jane thorpe
 hi,
Are there setup instructions on the website for 
spark-3.0.0-preview2-bin-hadoop2.7I can run same program for hdfs format
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

val textFile = sc.textFile("/data/README.md")val counts = textFile.flatMap(line 
=> line.split(" ")) .map(word => (word, 1)) 
.reduceByKey(_ + _)counts.saveAsTextFile("/data/wordcount")
textFile: org.apache.spark.rdd.RDD[String] = /data/README.md 
MapPartitionsRDD[23] at textFile at :28counts: 
org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at 
:31


br
Jane 


Ceph / Lustre VS hdfs comparison

2020-02-12 Thread Nicolas PARIS
Hi

Anyone has experience in ceph / lustre as a replacement of hdfs for
spark storage (parquet, orc..)?

Is hdfs still far superior to the former ?

Thanks

-- 
nicolas paris

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread Gourav Sengupta
Hi Will,

have you tried using S3 as state store with the option in EMR enabled for
faster file sync, also there is an option now of using FSx Lustre.

Thanks and Regards,
Gourav Sengupta

On Wed, Jan 15, 2020 at 5:17 AM William Briggs  wrote:

> Hi all, I've got a problem that really has me stumped. I'm running a
> Structured Streaming query that reads from Kafka, performs some
> transformations and stateful aggregations (using flatMapGroupsWithState),
> and outputs any updated aggregates to another Kafka topic.
>
> I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
> Semi-regularly, all the tasks except one will complete, and the one
> remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
> I've checked the number of input records (and overall size) for that task,
> and everything seems in-line with all the other tasks - there's no visible
> skew.
>
> The only thing I have to go on at the moment is that the thread dump on
> the executor that is hung shows a 'state-store-maintenance-task' thread,
> which is blocked on an "Executor task launch worker" thread - that second
> thread shows as TIMED_WAITING, with the following locks:
>
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
>> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
>> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
>> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
>> })
>>
>
> And a stack of:
>
> java.lang.Object.wait(Native Method)
>>
>> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
>> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
>> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
>> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
>> => holding
>> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
>> })
>> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
>> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
>> => holding
>> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
>> })
>>
>> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>>
>> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>>
>
> Based on this, I'm guessing that there's some kind of delay happening with
> the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
> large GCs, plenty of free memory, network bandwidth isn't saturated, no
> under-replicated blocks).
>
> Has anyone run into a problem like this before? Any help would be greatly
> appreciated!
>
> Regards,
> Will
>


Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread William Briggs
Hi all, I've got a problem that really has me stumped. I'm running a
Structured Streaming query that reads from Kafka, performs some
transformations and stateful aggregations (using flatMapGroupsWithState),
and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
Semi-regularly, all the tasks except one will complete, and the one
remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
I've checked the number of input records (and overall size) for that task,
and everything seems in-line with all the other tasks - there's no visible
skew.

The only thing I have to go on at the moment is that the thread dump on the
executor that is hung shows a 'state-store-maintenance-task' thread, which
is blocked on an "Executor task launch worker" thread - that second thread
shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>

And a stack of:

java.lang.Object.wait(Native Method)
>
> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
> })
> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>

Based on this, I'm guessing that there's some kind of delay happening with
the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
large GCs, plenty of free memory, network bandwidth isn't saturated, no
under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly
appreciated!

Regards,
Will


Re: Out of memory HDFS Read and Write

2019-12-22 Thread Ruijing Li
@Chris destPaths is just a Seq[String] that holds the paths we wish to copy
the output to. Even if the collection only holds one path, it does not
work. However, the job runs fine if we don’t copy the output. The pipeline
succeeds in read input -> perform logic as dataframe -> write output. As
for your second question, I’m not sure how spark handles it, do the
executors come back to the driver to read or they have their own copy? I
don’t see any driver issues, but I will try experimenting on making that
Seq into a Dataset[String] instead if it helps.

@Sumedh That issue seems interesting to me. I need to dive into it further.
>From a quick glance, that issue describes large parquet files, but our data
is rather small. Additionally, as described above, our pipeline can run
fine with given resources if it read input -> perform logic as dataframe ->
write output, but fails on additional reads It seems the jira
describes our job should fail or see issues at the start. Lastly, I found
increasing off-heap helped more than increasing heap size for executor
(executor.memoryOverhead vs executor.memory) but we use spark 2.3.

On Sun, Dec 22, 2019 at 7:44 AM Sumedh Wale  wrote:

> Parquet reads in Spark need lots of tempory heap memory due to
> ColumnVectors and write block size. See a similar issue:
> https://jira.snappydata.io/browse/SNAP-3111
>
> In addition writes too consume significant amount of heap due to
> parquet.block.size. One solution is to reduce the spark.executor.cores in
> such a job (note the approx heap calculation noted in the ticket). Other
> solution is increased executor heap. Or use off-heap configuration with
> Spark 2.4 which will remove the pressure for reads but not for writes.
>
> regards
> sumedh
>
> On Sun, 22 Dec, 2019, 14:29 Ruijing Li,  wrote:
>
>> I was experimenting and found something interesting. I have executor OOM
>> even if I don’t write to remote clusters. So it is purely a dataframe read
>> and write issue
>> —
>> To recap, I have an ETL data pipeline that does some logic, repartitions
>> to reduce the amount of files written, writes the output to HDFS as parquet
>> files. After, it reads the output and writes it to other locations, doesn’t
>> matter if on the same hadoop cluster or multiple. This is a simple piece of
>> code
>> ```
>> destPaths.foreach(path =>
>> Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
>> match {
>> //log failure or success
>> }
>> ```
>> However this stage - read from sourceOutput and write to different
>> locations - is failing in Spark, despite all other stages succeeding,
>> including the heavy duty logic. And the data is not too big to handle for
>> spark.
>>
>> Only bumping memoryOverhead, and also repartitioning output to more
>> partitions, 40 precisely (when it failed, we partitioned the output to 20
>> after logic is finished but before writing to HDFS) have made the
>> read stage succeed.
>>
>> Not understanding how spark read stage can experience OOM issues.
>> Hoping to shed some light on why.
>>
>> On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:
>>
>>> I'm not entirely sure what the behaviour is when writing to remote
>>> cluster. It could be that the connections are being established for every
>>> element in your dataframe, perhaps having to use for each partition may
>>> reduce the number of connections? You may have to look at what the
>>> executors do when they reach out to the remote cluster.
>>>
>>> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>>>
>>>> I managed to make the failing stage work by increasing memoryOverhead
>>>> to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I
>>>> bumped spark.mesos.executor.memoryOverhead=8G
>>>>
>>>> *Can someone explain why this solved the issue?* As I understand,
>>>> usage of memoryOverhead is for VM overhead and non heap items, which a
>>>> simple read and write should not use (albeit to different hadoop clusters,
>>>> but network should be nonissue since they are from the same machines).
>>>>
>>>> We use spark defaults for everything else.
>>>>
>>>> We are calling df.repartition(20) in our write after logic is done
>>>> (before failing stage of multiple cluster write) to prevent spark’s small
>>>> files problem. We reduce from 4000 partitions to 20.
>>>>
>>>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Not for the stage that fails, all 

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Chris Teoh
Does it work for just a single path input and single output?

Is the destPath a collection that is sitting on the driver?

On Sun, 22 Dec 2019, 7:59 pm Ruijing Li,  wrote:

> I was experimenting and found something interesting. I have executor OOM
> even if I don’t write to remote clusters. So it is purely a dataframe read
> and write issue
> —
> To recap, I have an ETL data pipeline that does some logic, repartitions
> to reduce the amount of files written, writes the output to HDFS as parquet
> files. After, it reads the output and writes it to other locations, doesn’t
> matter if on the same hadoop cluster or multiple. This is a simple piece of
> code
> ```
> destPaths.foreach(path =>
> Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
> match {
> //log failure or success
> }
> ```
> However this stage - read from sourceOutput and write to different
> locations - is failing in Spark, despite all other stages succeeding,
> including the heavy duty logic. And the data is not too big to handle for
> spark.
>
> Only bumping memoryOverhead, and also repartitioning output to more
> partitions, 40 precisely (when it failed, we partitioned the output to 20
> after logic is finished but before writing to HDFS) have made the
> read stage succeed.
>
> Not understanding how spark read stage can experience OOM issues.
> Hoping to shed some light on why.
>
> On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:
>
>> I'm not entirely sure what the behaviour is when writing to remote
>> cluster. It could be that the connections are being established for every
>> element in your dataframe, perhaps having to use for each partition may
>> reduce the number of connections? You may have to look at what the
>> executors do when they reach out to the remote cluster.
>>
>> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>>
>>> I managed to make the failing stage work by increasing memoryOverhead to
>>> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
>>> spark.mesos.executor.memoryOverhead=8G
>>>
>>> *Can someone explain why this solved the issue?* As I understand, usage
>>> of memoryOverhead is for VM overhead and non heap items, which a simple
>>> read and write should not use (albeit to different hadoop clusters, but
>>> network should be nonissue since they are from the same machines).
>>>
>>> We use spark defaults for everything else.
>>>
>>> We are calling df.repartition(20) in our write after logic is done
>>> (before failing stage of multiple cluster write) to prevent spark’s small
>>> files problem. We reduce from 4000 partitions to 20.
>>>
>>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>>> wrote:
>>>
>>>> Not for the stage that fails, all it does is read and write - the
>>>> number of tasks is # of cores * # of executor instances. For us that is 60
>>>> (3 cores 20 executors)
>>>>
>>>> The input partition size for the failing stage, when spark reads the 20
>>>> files each 132M, it comes out to be 40 partitions.
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh 
>>>> wrote:
>>>>
>>>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>>>> if the number of your input partitions to the write is larger than that
>>>>> configuration.
>>>>>
>>>>> Is there anything in the executor logs or the Spark UI DAG that
>>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>>>> What's the input partition size?
>>>>>
>>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, 
>>>>> wrote:
>>>>>
>>>>>> Could you explain why shuffle partitions might be a good starting
>>>>>> point?
>>>>>>
>>>>>> Some more details: when I write the output the first time after logic
>>>>>> is complete, I repartition the files to 20 after having
>>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small 
>>>>>> files.
>>>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>>>> partitions and tries to output that to the different cluster. 
>>>>>> Unfortunately
>>>>>> during that read and write stage executors drop off.
>>>>>>
>>>>>> We kee

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Ruijing Li
I was experimenting and found something interesting. I have executor OOM
even if I don’t write to remote clusters. So it is purely a dataframe read
and write issue
—
To recap, I have an ETL data pipeline that does some logic, repartitions to
reduce the amount of files written, writes the output to HDFS as parquet
files. After, it reads the output and writes it to other locations, doesn’t
matter if on the same hadoop cluster or multiple. This is a simple piece of
code
```
destPaths.foreach(path =>
Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
match {
//log failure or success
}
```
However this stage - read from sourceOutput and write to different
locations - is failing in Spark, despite all other stages succeeding,
including the heavy duty logic. And the data is not too big to handle for
spark.

Only bumping memoryOverhead, and also repartitioning output to more
partitions, 40 precisely (when it failed, we partitioned the output to 20
after logic is finished but before writing to HDFS) have made the
read stage succeed.

Not understanding how spark read stage can experience OOM issues.
Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:

> I'm not entirely sure what the behaviour is when writing to remote
> cluster. It could be that the connections are being established for every
> element in your dataframe, perhaps having to use for each partition may
> reduce the number of connections? You may have to look at what the
> executors do when they reach out to the remote cluster.
>
> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>
>> I managed to make the failing stage work by increasing memoryOverhead to
>> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
>> spark.mesos.executor.memoryOverhead=8G
>>
>> *Can someone explain why this solved the issue?* As I understand, usage
>> of memoryOverhead is for VM overhead and non heap items, which a simple
>> read and write should not use (albeit to different hadoop clusters, but
>> network should be nonissue since they are from the same machines).
>>
>> We use spark defaults for everything else.
>>
>> We are calling df.repartition(20) in our write after logic is done
>> (before failing stage of multiple cluster write) to prevent spark’s small
>> files problem. We reduce from 4000 partitions to 20.
>>
>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>> wrote:
>>
>>> Not for the stage that fails, all it does is read and write - the number
>>> of tasks is # of cores * # of executor instances. For us that is 60 (3
>>> cores 20 executors)
>>>
>>> The input partition size for the failing stage, when spark reads the 20
>>> files each 132M, it comes out to be 40 partitions.
>>>
>>>
>>>
>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>>>
>>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>>> if the number of your input partitions to the write is larger than that
>>>> configuration.
>>>>
>>>> Is there anything in the executor logs or the Spark UI DAG that
>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>>> What's the input partition size?
>>>>
>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, 
>>>> wrote:
>>>>
>>>>> Could you explain why shuffle partitions might be a good starting
>>>>> point?
>>>>>
>>>>> Some more details: when I write the output the first time after logic
>>>>> is complete, I repartition the files to 20 after having
>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>>> partitions and tries to output that to the different cluster. 
>>>>> Unfortunately
>>>>> during that read and write stage executors drop off.
>>>>>
>>>>> We keep hdfs block 128Mb
>>>>>
>>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh 
>>>>> wrote:
>>>>>
>>>>>> spark.sql.shuffle.partitions might be a start.
>>>>>>
>>>>>> Is there a difference in the number of partitions when the parquet is
>>>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>>>> spark.sql.shuffle.partitions?
>>>>>>
>>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, 
>>>>>> wrote:
>

Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Chris Teoh
I'm not entirely sure what the behaviour is when writing to remote cluster.
It could be that the connections are being established for every element in
your dataframe, perhaps having to use for each partition may reduce the
number of connections? You may have to look at what the executors do when
they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:

> I managed to make the failing stage work by increasing memoryOverhead to
> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
> spark.mesos.executor.memoryOverhead=8G
>
> *Can someone explain why this solved the issue?* As I understand, usage
> of memoryOverhead is for VM overhead and non heap items, which a simple
> read and write should not use (albeit to different hadoop clusters, but
> network should be nonissue since they are from the same machines).
>
> We use spark defaults for everything else.
>
> We are calling df.repartition(20) in our write after logic is done (before
> failing stage of multiple cluster write) to prevent spark’s small files
> problem. We reduce from 4000 partitions to 20.
>
> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:
>
>> Not for the stage that fails, all it does is read and write - the number
>> of tasks is # of cores * # of executor instances. For us that is 60 (3
>> cores 20 executors)
>>
>> The input partition size for the failing stage, when spark reads the 20
>> files each 132M, it comes out to be 40 partitions.
>>
>>
>>
>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>>
>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>> if the number of your input partitions to the write is larger than that
>>> configuration.
>>>
>>> Is there anything in the executor logs or the Spark UI DAG that
>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>> What's the input partition size?
>>>
>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>>
>>>> Could you explain why shuffle partitions might be a good starting point?
>>>>
>>>> Some more details: when I write the output the first time after logic
>>>> is complete, I repartition the files to 20 after having
>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>> partitions and tries to output that to the different cluster. Unfortunately
>>>> during that read and write stage executors drop off.
>>>>
>>>> We keep hdfs block 128Mb
>>>>
>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh 
>>>> wrote:
>>>>
>>>>> spark.sql.shuffle.partitions might be a start.
>>>>>
>>>>> Is there a difference in the number of partitions when the parquet is
>>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>>> spark.sql.shuffle.partitions?
>>>>>
>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have encountered a strange executor OOM error. I have a data
>>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output 
>>>>>> to
>>>>>> one HDFS location as parquet then reads the files back in and writes to
>>>>>> multiple hadoop clusters (all co-located in the same datacenter).  It
>>>>>> should be a very simple task, but executors are being killed off 
>>>>>> exceeding
>>>>>> container thresholds. From logs, it is exceeding given memory (using 
>>>>>> Mesos
>>>>>> as the cluster manager).
>>>>>>
>>>>>> The ETL process works perfectly fine with the given resources, doing
>>>>>> joins and adding columns. The output is written successfully the first
>>>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>>>> and writes it to different HDFS cluster paths does it fail.* (It
>>>>>> does a spark.read.parquet(source).write.parquet(dest))
>>>>>>
>>>>>> This doesn't really make sense and I'm wondering what configurations
>>>>>> I should start looking at.
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>> Cheers,
>> Ruijing Li
>>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
I managed to make the failing stage work by increasing memoryOverhead to
something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
spark.mesos.executor.memoryOverhead=8G

*Can someone explain why this solved the issue?* As I understand, usage of
memoryOverhead is for VM overhead and non heap items, which a simple read
and write should not use (albeit to different hadoop clusters, but network
should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before
failing stage of multiple cluster write) to prevent spark’s small files
problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:

> Not for the stage that fails, all it does is read and write - the number
> of tasks is # of cores * # of executor instances. For us that is 60 (3
> cores 20 executors)
>
> The input partition size for the failing stage, when spark reads the 20
> files each 132M, it comes out to be 40 partitions.
>
>
>
> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>
>> If you're using Spark SQL, that configuration setting causes a shuffle if
>> the number of your input partitions to the write is larger than that
>> configuration.
>>
>> Is there anything in the executor logs or the Spark UI DAG that indicates
>> a shuffle? I don't expect a shuffle if it is a straight write. What's the
>> input partition size?
>>
>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>
>>> Could you explain why shuffle partitions might be a good starting point?
>>>
>>> Some more details: when I write the output the first time after logic is
>>> complete, I repartition the files to 20 after having
>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>> Data is small about 130MB per file. When spark reads it reads in 40
>>> partitions and tries to output that to the different cluster. Unfortunately
>>> during that read and write stage executors drop off.
>>>
>>> We keep hdfs block 128Mb
>>>
>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>>
>>>> spark.sql.shuffle.partitions might be a start.
>>>>
>>>> Is there a difference in the number of partitions when the parquet is
>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>> spark.sql.shuffle.partitions?
>>>>
>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have encountered a strange executor OOM error. I have a data
>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to
>>>>> one HDFS location as parquet then reads the files back in and writes to
>>>>> multiple hadoop clusters (all co-located in the same datacenter).  It
>>>>> should be a very simple task, but executors are being killed off exceeding
>>>>> container thresholds. From logs, it is exceeding given memory (using Mesos
>>>>> as the cluster manager).
>>>>>
>>>>> The ETL process works perfectly fine with the given resources, doing
>>>>> joins and adding columns. The output is written successfully the first
>>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>>> and writes it to different HDFS cluster paths does it fail.* (It does
>>>>> a spark.read.parquet(source).write.parquet(dest))
>>>>>
>>>>> This doesn't really make sense and I'm wondering what configurations I
>>>>> should start looking at.
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li
-- 
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
Not for the stage that fails, all it does is read and write - the number of
tasks is # of cores * # of executor instances. For us that is 60 (3 cores
20 executors)

The input partition size for the failing stage, when spark reads the 20
files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:

> If you're using Spark SQL, that configuration setting causes a shuffle if
> the number of your input partitions to the write is larger than that
> configuration.
>
> Is there anything in the executor logs or the Spark UI DAG that indicates
> a shuffle? I don't expect a shuffle if it is a straight write. What's the
> input partition size?
>
> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>
>> Could you explain why shuffle partitions might be a good starting point?
>>
>> Some more details: when I write the output the first time after logic is
>> complete, I repartition the files to 20 after having
>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>> Data is small about 130MB per file. When spark reads it reads in 40
>> partitions and tries to output that to the different cluster. Unfortunately
>> during that read and write stage executors drop off.
>>
>> We keep hdfs block 128Mb
>>
>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>
>>> spark.sql.shuffle.partitions might be a start.
>>>
>>> Is there a difference in the number of partitions when the parquet is
>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>> spark.sql.shuffle.partitions?
>>>
>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have encountered a strange executor OOM error. I have a data pipeline
>>>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>>>> location as parquet then reads the files back in and writes to multiple
>>>> hadoop clusters (all co-located in the same datacenter).  It should be a
>>>> very simple task, but executors are being killed off exceeding container
>>>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>>>> cluster manager).
>>>>
>>>> The ETL process works perfectly fine with the given resources, doing
>>>> joins and adding columns. The output is written successfully the first
>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>> and writes it to different HDFS cluster paths does it fail.* (It does
>>>> a spark.read.parquet(source).write.parquet(dest))
>>>>
>>>> This doesn't really make sense and I'm wondering what configurations I
>>>> should start looking at.
>>>>
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
If you're using Spark SQL, that configuration setting causes a shuffle if
the number of your input partitions to the write is larger than that
configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a
shuffle? I don't expect a shuffle if it is a straight write. What's the
input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:

> Could you explain why shuffle partitions might be a good starting point?
>
> Some more details: when I write the output the first time after logic is
> complete, I repartition the files to 20 after having
> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
> Data is small about 130MB per file. When spark reads it reads in 40
> partitions and tries to output that to the different cluster. Unfortunately
> during that read and write stage executors drop off.
>
> We keep hdfs block 128Mb
>
> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>
>> spark.sql.shuffle.partitions might be a start.
>>
>> Is there a difference in the number of partitions when the parquet is
>> read to spark.sql.shuffle.partitions? Is it much higher than
>> spark.sql.shuffle.partitions?
>>
>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>
>>> Hi all,
>>>
>>> I have encountered a strange executor OOM error. I have a data pipeline
>>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>>> location as parquet then reads the files back in and writes to multiple
>>> hadoop clusters (all co-located in the same datacenter).  It should be a
>>> very simple task, but executors are being killed off exceeding container
>>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>>> cluster manager).
>>>
>>> The ETL process works perfectly fine with the given resources, doing
>>> joins and adding columns. The output is written successfully the first
>>> time. *Only when the pipeline at the end reads the output from HDFS and
>>> writes it to different HDFS cluster paths does it fail.* (It does a
>>> spark.read.parquet(source).write.parquet(dest))
>>>
>>> This doesn't really make sense and I'm wondering what configurations I
>>> should start looking at.
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Ruijing Li
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is
complete, I repartition the files to 20 after having
spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
Data is small about 130MB per file. When spark reads it reads in 40
partitions and tries to output that to the different cluster. Unfortunately
during that read and write stage executors drop off.

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:

> spark.sql.shuffle.partitions might be a start.
>
> Is there a difference in the number of partitions when the parquet is read
> to spark.sql.shuffle.partitions? Is it much higher than
> spark.sql.shuffle.partitions?
>
> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>
>> Hi all,
>>
>> I have encountered a strange executor OOM error. I have a data pipeline
>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>> location as parquet then reads the files back in and writes to multiple
>> hadoop clusters (all co-located in the same datacenter).  It should be a
>> very simple task, but executors are being killed off exceeding container
>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>> cluster manager).
>>
>> The ETL process works perfectly fine with the given resources, doing
>> joins and adding columns. The output is written successfully the first
>> time. *Only when the pipeline at the end reads the output from HDFS and
>> writes it to different HDFS cluster paths does it fail.* (It does a
>> spark.read.parquet(source).write.parquet(dest))
>>
>> This doesn't really make sense and I'm wondering what configurations I
>> should start looking at.
>>
>> --
>> Cheers,
>> Ruijing Li
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read
to spark.sql.shuffle.partitions? Is it much higher than
spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:

> Hi all,
>
> I have encountered a strange executor OOM error. I have a data pipeline
> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
> location as parquet then reads the files back in and writes to multiple
> hadoop clusters (all co-located in the same datacenter).  It should be a
> very simple task, but executors are being killed off exceeding container
> thresholds. From logs, it is exceeding given memory (using Mesos as the
> cluster manager).
>
> The ETL process works perfectly fine with the given resources, doing joins
> and adding columns. The output is written successfully the first time. *Only
> when the pipeline at the end reads the output from HDFS and writes it to
> different HDFS cluster paths does it fail.* (It does a
> spark.read.parquet(source).write.parquet(dest))
>
> This doesn't really make sense and I'm wondering what configurations I
> should start looking at.
>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


  1   2   3   4   5   6   7   8   9   10   >