Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

2022-05-24 Thread Ori Popowski
Hi,

Both jobs use spark.dynamicAllocation.enabled so there's no need to change
the number of executors. There are 702 executors in the Dataproc cluster so
this is not the problem.
About number of partitions - this I didn't change and it's still 400. While
writing this now, I am realising that I have more partitions than
executors, but the same situation applies to EMR.

I am observing 1 task in the final stage also on EMR. The difference is
that on EMR that task receives 50K volume of data and on Dataproc it
receives 700gb. I don't understand why it's happening. It can mean that the
graph is different. But the job is exactly the same. Could it be because
the minor version of Spark is different?

On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee 
wrote:

> Hi Ori,
>
> A single task for the final step can result from various scenarios like an
> aggregate operation that results in only 1 value (e.g count) or a key based
> aggregate with only 1 key for example. There could be other scenarios as
> well. However, that would be the case in both EMR and Dataproc if the same
> code is run on the same data in both cases.
>
> On a separate note, since you have now changed the size and number of
> nodes, you may need to re-optimize the number and size of executors for the
> job and perhaps the number of partitions as well to optimally use the
> cluster resources.
>
> Regards,
> Ranadip
>
> On Tue, 24 May 2022, 10:45 Ori Popowski,  wrote:
>
>> Hello
>>
>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8.
>> I am creating a cluster with the exact same configuration, where the only
>> difference is that the original cluster uses 78 workers with 96 CPUs and
>> 768GiB memory each, and in the new cluster I am using 117 machines with 64
>> CPUs and 512GiB each, to achieve the same amount of resources in the
>> cluster.
>>
>> The job is run with the same configuration (num of partitions,
>> parallelism, etc.) and reads the same data. However, something strange
>> happens and the job takes 20 hours. What I observed is that there is a
>> stage where the driver instantiates a single task, and this task never
>> starts because the shuffle of moving all the data to it takes forever.
>>
>> I also compared the runtime configuration and found some minor
>> differences (due to Dataproc being different from EMR) but I haven't found
>> any substantial difference.
>>
>> In other stages the cluster utilizes all the partitions (400), and it's
>> not clear to me why it decides to invoke a single task.
>>
>> Can anyone provide an insight as to why such a thing would happen?
>>
>> Thanks
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Push-Based Shuffle causing multiple stage failures

2022-05-24 Thread Ye Zhou
Hi, Han.
Thanks for trying out the push based shuffle.
Please make sure you configure both the Spark client side configuration and
server side configurations.
The client side configuration looks good, and from the error message, looks
like you are missing the server side configurations.
Please refer to this blog post about how to try out push based shuffle.
https://engineering.linkedin.com/blog/2021/push-based-shuffle-in-apache-spark.
And also this documentation about how to configure properly for External
shuffle service in YARN environment.
https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
In our environment, we added this parameter to enable server side push
based shuffle in yarn-site.xml for NodeManager configurations:
  
spark.shuffle.push.server.mergedShuffleFileManagerImpl
org.apache.spark.network.shuffle.RemoteBlockPushResolver
  

On Tue, May 24, 2022 at 3:30 PM Mridul Muralidharan 
wrote:

> +CC zhouye...@gmail.com
>
>
> On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran  wrote:
>
>> Hi,
>>
>> First of all, I am very thankful for all of the amazing work that goes
>> into this project! It has opened up so many doors for me! I am a long
>> time Spark user, and was very excited to start working with the push-based
>> shuffle service for an academic paper we are working on, but I encountered
>> some difficulties along the way and am wondering if someone could help me
>> resolve this new feature. I was able to get the push-based shuffle running
>> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
>> installation on top of the dataproc base installations using a custom
>> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
>> it with the new one for spark 3.2), however the main issue is that when I
>> actually try to use spark shuffles using the push-based shuffle, I
>> consistently encounter errors of the following sort:
>>
>> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 163.0 in stage 3.1 (TID 16729)
>> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
>> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
>> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
>> org.apache.spark.shuffle.FetchFailedException
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>> at
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>> at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>> at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>> at
>> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>> at org.apache.spark.scheduler.Task.run(Task.scala:131)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>> at
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at 

Re: Spark Push-Based Shuffle causing multiple stage failures

2022-05-24 Thread Mridul Muralidharan
+CC zhouye...@gmail.com


On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran  wrote:

> Hi,
>
> First of all, I am very thankful for all of the amazing work that goes
> into this project! It has opened up so many doors for me! I am a long
> time Spark user, and was very excited to start working with the push-based
> shuffle service for an academic paper we are working on, but I encountered
> some difficulties along the way and am wondering if someone could help me
> resolve this new feature. I was able to get the push-based shuffle running
> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
> installation on top of the dataproc base installations using a custom
> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
> it with the new one for spark 3.2), however the main issue is that when I
> actually try to use spark shuffles using the push-based shuffle, I
> consistently encounter errors of the following sort:
>
> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
> task 163.0 in stage 3.1 (TID 16729)
> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
> org.apache.spark.shuffle.FetchFailedException
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
> at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
> at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.IOException: Failed to send RPC
> StreamChunkId[streamId=1514743314249,chunkIndex=59] to
> cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
> java.io.IOException: Connection reset by peer
> at
> org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
> at
> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at
> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
> at
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
> at
> 

GCP Dataproc - adding multiple packages(kafka, mongodb) while submitting spark jobs not working

2022-05-24 Thread karan alang
Hello All,
I've a Structured Streaming job on GCP Dataproc, and i'm trying to pass
multiple packages (kafka, mongoDB)  to the dataproc submit command, and
that is not working.

Command that is working (when i add single dependency eg. Kafka) :
```

gcloud dataproc jobs submit pyspark main.py \
  --cluster versa-structured-stream  \
  --properties 
spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,
spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true

```

However, when i add the mongoDB package as well (tried a few options) - it
seems to be failing.
eg.
```
Option 1 :
gcloud dataproc jobs submit pyspark main.py \

  --cluster versa-structured-stream  \
  --properties 
^#^spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2,spark:spark.dynamicAllocation.enabled=true,spark:spark.shuffle.service.enabled=true,spark:spark.executor.memory=20g,spark:spark.driver.memory=5g,spark:spark.executor.cores=2
\
  
--jars=gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar,gs://dataproc-spark-jars/isolation-forest_2.4.3_2.12-2.0.8.jar,gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar
\
  
--files=gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani.p12,gs://kafka-certs/alarm-compression-user.p12,gs://kafka-certs/appstats-user.p12,gs://kafka-certs/insights-user.p12,gs://kafka-certs/intfutil-user.p12,gs://kafka-certs/reloadpred-chkpoint-user.p12,gs://kafka-certs/reloadpred-user.p12,gs://dataproc-spark-configs/topic-customer-map.cfg,gs://dataproc-spark-configs/params.cfg
 \
  --region us-east1 \
  --py-files streams.zip,utils.zip


Option 2 :
gcloud dataproc jobs submit pyspark main.py \
  --cluster versa-structured-stream \
  --properties
spark.jars.packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2',spark:spark.dynamicAllocation.enabled=true,spark:spark.shuffle.service.enabled=true,spark:spark.executor.memory=20g,spark:spark.driver.memory=5g,spark:spark.executor.cores=
2 \

  
--jars=gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar,gs://dataproc-spark-jars/isolation-forest_2.4.3_2.12-2.0.8.jar,gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar
\
  
--files=gs://kafka-certs/versa-kafka-gke-ca.p12,gs://kafka-certs/syslog-vani.p12,gs://kafka-certs/alarm-compression-user.p12,gs://kafka-certs/appstats-user.p12,gs://kafka-certs/insights-user.p12,gs://kafka-certs/intfutil-user.p12,gs://kafka-certs/reloadpred-chkpoint-user.p12,gs://kafka-certs/reloadpred-user.p12,gs://dataproc-spark-configs/topic-customer-map.cfg,gs://dataproc-spark-configs/params.cfg
 \
  --region us-east1 \
  --py-files streams.zip,utils.zip


```

Any pointers on how to fix/debug this ?

details also in the stackoverflow link -
https://stackoverflow.com/questions/72369619/gcp-dataproc-adding-multiple-packageskafka-mongodb-while-submitting-jobs-no

tia!


Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

2022-05-24 Thread Ranadip Chatterjee
Hi Ori,

A single task for the final step can result from various scenarios like an
aggregate operation that results in only 1 value (e.g count) or a key based
aggregate with only 1 key for example. There could be other scenarios as
well. However, that would be the case in both EMR and Dataproc if the same
code is run on the same data in both cases.

On a separate note, since you have now changed the size and number of
nodes, you may need to re-optimize the number and size of executors for the
job and perhaps the number of partitions as well to optimally use the
cluster resources.

Regards,
Ranadip

On Tue, 24 May 2022, 10:45 Ori Popowski,  wrote:

> Hello
>
> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8. I
> am creating a cluster with the exact same configuration, where the only
> difference is that the original cluster uses 78 workers with 96 CPUs and
> 768GiB memory each, and in the new cluster I am using 117 machines with 64
> CPUs and 512GiB each, to achieve the same amount of resources in the
> cluster.
>
> The job is run with the same configuration (num of partitions,
> parallelism, etc.) and reads the same data. However, something strange
> happens and the job takes 20 hours. What I observed is that there is a
> stage where the driver instantiates a single task, and this task never
> starts because the shuffle of moving all the data to it takes forever.
>
> I also compared the runtime configuration and found some minor differences
> (due to Dataproc being different from EMR) but I haven't found any
> substantial difference.
>
> In other stages the cluster utilizes all the partitions (400), and it's
> not clear to me why it decides to invoke a single task.
>
> Can anyone provide an insight as to why such a thing would happen?
>
> Thanks
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Problem with implementing the Datasource V2 API for Salesforce

2022-05-24 Thread Gourav Sengupta
Hi,

in the spirit of not fitting the solution to the problem, would it not be
better to first create a producer for your job and use a broker like Kafka
or Kinesis or Pulsar?


Regards,
Gourav Sengupta

On Sat, May 21, 2022 at 3:46 PM Rohit Pant  wrote:

> Hi all,
>
> I am trying to implement a custom Spark Datasource for Salesforce by
> implementing the Spark Datasource V2 interfaces. For querying Salesforce
> data parallelly I am planning to use the Salesforce Bulk V1 API
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
> .
>
> I am aware that there is an existing Salesforce Spark library by Springml,
> but it doesn't support the authentication types I need and isn't compatible
> with Spark 3.x.
>
> I am not using the Bulk V2 Salesforce API as it is serial in nature. You
> submit a query, Salesforce automatically creates the batches, you then need
> to poll for results and iterate over the batches using batch locators
> returned in the header.
>
> I am planning to make it work like this -
>
>
>1. The user specifies the options numPartitions and dbtable. Using
>this, internally I will fetch the record counts for that Salesforce object
>and deduce the chunk size to be used for querying the data using PK
>chunking.
>
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
>2. The Bulk API is asynchronous. You need to create a Job and then
>create query batches within the job. If you specify that you want to use PK
>Chunking along with the chunk size you want then Salesforce automatically
>creates the batches for you. You then need to poll for and fetch the
>results for each batch using a batch-specific URL.
>3. I am planning to pass the batch ID to each executor using an
>InputPartition object. Each executor will then poll for and fetch the
>results.
>
> I am having trouble deciding how I would go about creating the Bulk job
> and submitting the batches on the driver node before dispatching the batch
> ids to the executors. I tried doing this in the implementation of the
> planInputPartitions method for the Batch interface, but it turns out that
> it is called 2-3 times per each action(show, collect etc.), thus creating
> unnecessary Bulk jobs.
>
> One potential solution that might work is maintaining a set of hashed user
> options in the static scope for the implementation of the Batch interface
> (using a companion object) and only creating the job if it doesn't exist in
> the set. However, I find this solution to be very clumsy. Also, what
> happens if a user submits multiple actions on a dataframe. I could maybe
> also have a TTL for the set entries, but you see how it gets complicated.
>
> Would really appreciate any pointers on the ideal way to achieve what I
> want.
>
> Regards,
>
> Rohit Pant
>