Fwd: Cassandra driver upgrade

2022-01-24 Thread Amit Sharma
I am upgrading my cassandra java driver version to the latest  4.13. I have
a Cassandra cluster using Cassandra version 3.11.11.
I am getting the below runtime error while connecting to cassandra.
Before version 4.13 I was using version 3.9 and things were working fine.

 c.d.o.d.i.c.c.ControlConnection - [s0] Error connecting to Node(endPoint=/
127.0.0.1:9042, hostId=null, hashCode=5495a763), trying next node
(ConnectionInitException: [s0|control|connecting...] Protocol
initialization request, step 1 (OPTIONS): failed to send request
(io.netty.channel.StacklessClosedChannelException))


Please suggest. it has blocked my production release.


Thanks
Amit


Re: What are your experiences using google cloud platform

2022-01-24 Thread Mich Talebzadeh
OK,

What configuration do you have for Dataproc master and worker nodes, what
machine types are they?

What storage have you allocated for each? Have you specified the Cloud
Storage staging bucket?

Have you considered autoscaling?

https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/autoscaling#enable_autoscaling_on_an_existing_cluster

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 24 Jan 2022 at 18:47, Andrew Davidson  wrote:

> I think my problem has to do with mega-mem machine. It was hard to get
> quota for mega-mem machines.  I wonder if they are unstable? Any
> suggestions for how I look at the ‘hardware’?
>
>
>
> I ran the same job several times. They all failed in different ways. Once
> looked like sort of networking problem accessing gcp buckets
>
>
>
> Several times it looked like my jobs fail when I call df.checkpoint()
> basically no progress in my driver log files after 30 mins.  Cpu
> utilization crashes from 60 % to  almost zero. I terminated the jobs.
>
>
>
> One time the checkpoint seemed to hang after doing a series of narrow
> transformations on a single data frame
>
>
>
> Most of the time the checkpoint seem to fail while calculate rowSums, I
> have reworked the rowSum code several times. See bellow for final versoin
>
>
>
> Based on google searches it seem like in gcp dataproc, people set the
> checkpointdir to be something like gs://myBucket/checkpoint/
>
>
>
> I see the cluster has a lot of HDFSstorage. As my job runs memory
> utilization == 100%. My cluster has 2.8 Tb of memory. Spark will eventually
> start  writing something to HDFS. As a newbie I would think we would want
> to set the checkpointdir to HDFS. I do not think HDFS is the limiting
> resource. It never seems to be fully exhausted. I did a lot of googling and
> was unable find an HDFS example URL. The checkpoints() are really slow.
> Takes twice as long as when I call cache().
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
> ###
>
> def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
>
> '''
>
> The GTEx training data set has 10409 numeric columns. This cause a
>
> java.lang.StackOverflowError because the DAG is to big. increasing
> spark driver
>
> memory does not help. The work around is sum  smaller batches of
> columns
>
> and cache the results of each batch
>
> '''
>
> self.logger.warn("rowSums BEGIN")
>
> totalColName = "rowSum"
>
> for i in range(0, len(columnNames), columnBatchSize) :
>
> tmpColName = "tmpSum" + str(i)
>
> batch = columnNames[i:i+columnBatchSize]
>
> countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName,
> batch)
>
>
>
> if i == 0:
>
> countsSparkDF =
> countsSparkDF.withColumnRenamed(tmpColName, totalColName)
>
>
>
> else:
>
> # calculate rolling total
>
> countsSparkDF = countsSparkDF.withColumn(totalColName,
> col(totalColName) + col(tmpColName))
>
> # save space
>
> countsSparkDF = countsSparkDF.drop(tmpColName
> )
>
>
>
> # use an action to force execution
>
> numRows = countsSparkDF.count()
>
> self.logger.warn("rowSums:batch:{} numRows:{}".format(i,
> numRows))
>
>
>
> # check point will save the df data but not its linage
>
> #countsSparkDF.cache()
>
> countsSparkDF.checkpoint()
>
>
>
> self.logger.warn("rowSums END")
>
> return countsSparkDF
>
>
>
>
> 
> ###
>
> def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
>
> '''
>
> calculates actual sum of columns
>
>
>
> arguments
>
> countSparkDF
>
>
>
> newColumName:
>
> results from column sum will be sorted here
>
>
>
> columnNames:
>
> list of columns to sum
>
>
>
> returns
>
> amended countSparkDF
>
> '''
>
> self.logger.warn( "rowSumsImpl BEGIN" )
>
>
>
> # https://stackoverflow.com/a/54283997/4586180
>
> retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName ,
> reduce( add, [col( x ) for x in columnNames] ) )
>
>
>
> self.logger.warn( "rowSumsImpl END\n" )
>
> return retDF
>
>
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Monday, January 24, 2022 at 12:54 AM
> 

Re: What are your experiences using google cloud platform

2022-01-24 Thread Andrew Davidson
I think my problem has to do with mega-mem machine. It was hard to get quota 
for mega-mem machines.  I wonder if they are unstable? Any suggestions for how 
I look at the ‘hardware’?

I ran the same job several times. They all failed in different ways. Once 
looked like sort of networking problem accessing gcp buckets

Several times it looked like my jobs fail when I call df.checkpoint() basically 
no progress in my driver log files after 30 mins.  Cpu utilization crashes from 
60 % to  almost zero. I terminated the jobs.

One time the checkpoint seemed to hang after doing a series of narrow 
transformations on a single data frame

Most of the time the checkpoint seem to fail while calculate rowSums, I have 
reworked the rowSum code several times. See bellow for final versoin

Based on google searches it seem like in gcp dataproc, people set the 
checkpointdir to be something like gs://myBucket/checkpoint/

I see the cluster has a lot of HDFSstorage. As my job runs memory utilization 
== 100%. My cluster has 2.8 Tb of memory. Spark will eventually start  writing 
something to HDFS. As a newbie I would think we would want to set the 
checkpointdir to HDFS. I do not think HDFS is the limiting resource. It never 
seems to be fully exhausted. I did a lot of googling and was unable find an 
HDFS example URL. The checkpoints() are really slow. Takes twice as long as 
when I call cache().

Comments and suggestions appreciated

Andy

###
def rowSums( self, countsSparkDF, columnNames, columnBatchSize ):
'''
The GTEx training data set has 10409 numeric columns. This cause a
java.lang.StackOverflowError because the DAG is to big. increasing 
spark driver
memory does not help. The work around is sum  smaller batches of columns
and cache the results of each batch
'''
self.logger.warn("rowSums BEGIN")
totalColName = "rowSum"
for i in range(0, len(columnNames), columnBatchSize) :
tmpColName = "tmpSum" + str(i)
batch = columnNames[i:i+columnBatchSize]
countsSparkDF = self.rowSumsImpl(countsSparkDF, tmpColName, batch)

if i == 0:
countsSparkDF = countsSparkDF.withColumnRenamed(tmpColName, 
totalColName)

else:
# calculate rolling total
countsSparkDF = countsSparkDF.withColumn(totalColName, 
col(totalColName) + col(tmpColName))
# save space
countsSparkDF = countsSparkDF.drop(tmpColName )

# use an action to force execution
numRows = countsSparkDF.count()
self.logger.warn("rowSums:batch:{} numRows:{}".format(i, numRows))

# check point will save the df data but not its linage
#countsSparkDF.cache()
countsSparkDF.checkpoint()

self.logger.warn("rowSums END")
return countsSparkDF


###
def rowSumsImpl( self, countsSparkDF, newColName, columnNames ):
'''
calculates actual sum of columns

arguments
countSparkDF

newColumName:
results from column sum will be sorted here

columnNames:
list of columns to sum

returns
amended countSparkDF
'''
self.logger.warn( "rowSumsImpl BEGIN" )

# https://stackoverflow.com/a/54283997/4586180
retDF = countsSparkDF.na.fill( 0 ).withColumn( newColName , reduce( 
add, [col( x ) for x in columnNames] ) )

self.logger.warn( "rowSumsImpl END\n" )
return retDF



From: Mich Talebzadeh 
Date: Monday, January 24, 2022 at 12:54 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: What are your experiences using google cloud platform

Dataproc works fine. The current version is Spark 3.1.2. Look at your code,  
hardware and scaling.



HTH


 
[https://docs.google.com/uc?export=download=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson  wrote:
Hi recently started using GCP dataproc spark.

Seem to have trouble getting big jobs to complete. I am using check points. I 
am wondering if maybe I should look for another cloud solution

Kind regards

Andy


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread Mich Talebzadeh
Hadoop core comprises HDFS (the storage), MapReduce (parallel execution
algorithm)  and YARN (the resource manager).

Spark can use YARN. in either cluster or client mode and can use HDFS for
temporary or permanent storage. As HDFS is available and accessible in
all nodes, Spark can take advantage of that. Spark does MapReduce in memory
as opposed to disk to speed up queries by order of magnitude. Spark is just
an application on Hadoop and not much more.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 24 Jan 2022 at 17:22, sam smith  wrote:

> spark-submit a spark application on Hadoop (cluster mode) that's what i
> mean by  executing on Hadoop
>
> Le lun. 24 janv. 2022 à 18:00, Sean Owen  a écrit :
>
>> I am still not understanding what you mean by "executing on Hadoop".
>> Spark does not use Hadoop for execution. Probably can't answer until this
>> is cleared up.
>>
>> On Mon, Jan 24, 2022 at 10:57 AM sam smith 
>> wrote:
>>
>>> I mean the DAG order is somehow altered when executing on Hadoop
>>>
>>> Le lun. 24 janv. 2022 à 17:17, Sean Owen  a écrit :
>>>
 Code is not executed by Hadoop, nor passed through Hadoop somehow. Do
 you mean data? data is read as-is. There is typically no guarantee about
 ordering of data in files but you can order data. Still not sure what
 specifically you are worried about here, but I don't think the kind of
 thing you're contemplating can happen, no

 On Mon, Jan 24, 2022 at 9:28 AM sam smith 
 wrote:

> I am aware of that, but whenever the chunks of code are returned to
> Spark from Hadoop (after processing) could they be done not in the ordered
> way ? could this ever happen ?
>
> Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :
>
>> Hadoop does not run Spark programs, Spark does. How or why would
>> something, what, modify the byte code? No
>>
>> On Mon, Jan 24, 2022, 9:07 AM sam smith 
>> wrote:
>>
>>> My point is could Hadoop go wrong about one Spark execution ?
>>> meaning that it gets confused (given the concurrent distributed tasks) 
>>> and
>>> then adds wrong instruction to the program, or maybe does execute an
>>> instruction not at its right order (shuffling the order of execution by
>>> executing previous ones, while it shouldn't) ? Before finishing and
>>> returning the results from one node it returns the results of the other 
>>> in
>>> a wrong way for example.
>>>
>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a
>>> écrit :
>>>
 Not clear what you mean here. A Spark program is a program, so what
 are the alternatives here? program execution order is still program
 execution order. You are not guaranteed anything about order of 
 concurrent
 tasks. Failed tasks can be reexecuted so should be idempotent. I think 
 the
 answer is 'no' but not sure what you are thinking of here.

 On Mon, Jan 24, 2022 at 7:10 AM sam smith <
 qustacksm2123...@gmail.com> wrote:

> Hello guys,
>
> I hope my question does not sound weird, but could a Spark
> execution on Hadoop cluster give different output than the program 
> actually
> does ? I mean by that, the execution order is messed by hadoop, or an
> instruction executed twice..; ?
>
> Thanks for your enlightenment
>



Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
spark-submit a spark application on Hadoop (cluster mode) that's what i
mean by  executing on Hadoop

Le lun. 24 janv. 2022 à 18:00, Sean Owen  a écrit :

> I am still not understanding what you mean by "executing on Hadoop". Spark
> does not use Hadoop for execution. Probably can't answer until this is
> cleared up.
>
> On Mon, Jan 24, 2022 at 10:57 AM sam smith 
> wrote:
>
>> I mean the DAG order is somehow altered when executing on Hadoop
>>
>> Le lun. 24 janv. 2022 à 17:17, Sean Owen  a écrit :
>>
>>> Code is not executed by Hadoop, nor passed through Hadoop somehow. Do
>>> you mean data? data is read as-is. There is typically no guarantee about
>>> ordering of data in files but you can order data. Still not sure what
>>> specifically you are worried about here, but I don't think the kind of
>>> thing you're contemplating can happen, no
>>>
>>> On Mon, Jan 24, 2022 at 9:28 AM sam smith 
>>> wrote:
>>>
 I am aware of that, but whenever the chunks of code are returned to
 Spark from Hadoop (after processing) could they be done not in the ordered
 way ? could this ever happen ?

 Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :

> Hadoop does not run Spark programs, Spark does. How or why would
> something, what, modify the byte code? No
>
> On Mon, Jan 24, 2022, 9:07 AM sam smith 
> wrote:
>
>> My point is could Hadoop go wrong about one Spark execution ? meaning
>> that it gets confused (given the concurrent distributed tasks) and then
>> adds wrong instruction to the program, or maybe does execute an 
>> instruction
>> not at its right order (shuffling the order of execution by executing
>> previous ones, while it shouldn't) ? Before finishing and returning the
>> results from one node it returns the results of the other in a wrong way
>> for example.
>>
>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>
>>> Not clear what you mean here. A Spark program is a program, so what
>>> are the alternatives here? program execution order is still program
>>> execution order. You are not guaranteed anything about order of 
>>> concurrent
>>> tasks. Failed tasks can be reexecuted so should be idempotent. I think 
>>> the
>>> answer is 'no' but not sure what you are thinking of here.
>>>
>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith <
>>> qustacksm2123...@gmail.com> wrote:
>>>
 Hello guys,

 I hope my question does not sound weird, but could a Spark
 execution on Hadoop cluster give different output than the program 
 actually
 does ? I mean by that, the execution order is messed by hadoop, or an
 instruction executed twice..; ?

 Thanks for your enlightenment

>>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
I mean the DAG order is somehow altered when executing on Hadoop

Le lun. 24 janv. 2022 à 17:17, Sean Owen  a écrit :

> Code is not executed by Hadoop, nor passed through Hadoop somehow. Do you
> mean data? data is read as-is. There is typically no guarantee about
> ordering of data in files but you can order data. Still not sure what
> specifically you are worried about here, but I don't think the kind of
> thing you're contemplating can happen, no
>
> On Mon, Jan 24, 2022 at 9:28 AM sam smith 
> wrote:
>
>> I am aware of that, but whenever the chunks of code are returned to Spark
>> from Hadoop (after processing) could they be done not in the ordered way ?
>> could this ever happen ?
>>
>> Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :
>>
>>> Hadoop does not run Spark programs, Spark does. How or why would
>>> something, what, modify the byte code? No
>>>
>>> On Mon, Jan 24, 2022, 9:07 AM sam smith 
>>> wrote:
>>>
 My point is could Hadoop go wrong about one Spark execution ? meaning
 that it gets confused (given the concurrent distributed tasks) and then
 adds wrong instruction to the program, or maybe does execute an instruction
 not at its right order (shuffling the order of execution by executing
 previous ones, while it shouldn't) ? Before finishing and returning the
 results from one node it returns the results of the other in a wrong way
 for example.

 Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :

> Not clear what you mean here. A Spark program is a program, so what
> are the alternatives here? program execution order is still program
> execution order. You are not guaranteed anything about order of concurrent
> tasks. Failed tasks can be reexecuted so should be idempotent. I think the
> answer is 'no' but not sure what you are thinking of here.
>
> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
> wrote:
>
>> Hello guys,
>>
>> I hope my question does not sound weird, but could a Spark execution
>> on Hadoop cluster give different output than the program actually does ? 
>> I
>> mean by that, the execution order is messed by hadoop, or an instruction
>> executed twice..; ?
>>
>> Thanks for your enlightenment
>>
>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread Sean Owen
Code is not executed by Hadoop, nor passed through Hadoop somehow. Do you
mean data? data is read as-is. There is typically no guarantee about
ordering of data in files but you can order data. Still not sure what
specifically you are worried about here, but I don't think the kind of
thing you're contemplating can happen, no

On Mon, Jan 24, 2022 at 9:28 AM sam smith 
wrote:

> I am aware of that, but whenever the chunks of code are returned to Spark
> from Hadoop (after processing) could they be done not in the ordered way ?
> could this ever happen ?
>
> Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :
>
>> Hadoop does not run Spark programs, Spark does. How or why would
>> something, what, modify the byte code? No
>>
>> On Mon, Jan 24, 2022, 9:07 AM sam smith 
>> wrote:
>>
>>> My point is could Hadoop go wrong about one Spark execution ? meaning
>>> that it gets confused (given the concurrent distributed tasks) and then
>>> adds wrong instruction to the program, or maybe does execute an instruction
>>> not at its right order (shuffling the order of execution by executing
>>> previous ones, while it shouldn't) ? Before finishing and returning the
>>> results from one node it returns the results of the other in a wrong way
>>> for example.
>>>
>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>>
 Not clear what you mean here. A Spark program is a program, so what are
 the alternatives here? program execution order is still program execution
 order. You are not guaranteed anything about order of concurrent tasks.
 Failed tasks can be reexecuted so should be idempotent. I think the answer
 is 'no' but not sure what you are thinking of here.

 On Mon, Jan 24, 2022 at 7:10 AM sam smith 
 wrote:

> Hello guys,
>
> I hope my question does not sound weird, but could a Spark execution
> on Hadoop cluster give different output than the program actually does ? I
> mean by that, the execution order is messed by hadoop, or an instruction
> executed twice..; ?
>
> Thanks for your enlightenment
>



triggering spark python app using native REST api

2022-01-24 Thread Michael Williams (SSI)
Hello,

I've been trying to work out how to replicate execution of a python app using 
spark-submit via the CLI using the native spark REST api 
(http://localhost:6066/v1/submissions/create) for a couple of weeks without 
success.  The environment is docker using the latest docker for spark 3.2 image 
from bitnami: one master and 2 workers in a standalone cluster.

These are two spark-submit commands via the CLI on the master for the same 
application that successfully execute the app and process the file.

bin/spark-submit file:///opt/bitnami/spark/hostfiles/bronze.py --filename 
"filename.json"

bin/spark-submit --deploy-mode "client" 
file:///opt/bitnami/spark/hostfiles/bronze.py --filename "filename.json"


Here's the body for the api request trying to accomplish the same thing as the 
CLI spark-submit.

{ "action": "CreateSubmissionRequest",

  "appArgs": [

"/opt/bitnami/spark/hostfiles/bronze.py", "-filename", "filename.json"

  ],

 "appResource": "file:/opt/bitnami/spark/hostfiles/bronze.py",

 "clientSparkVersion": "3.2.0",

 "environmentVariables": {

"SPARK_ENV_LOADED": "1"

  },

 "mainClass": "org.apache.spark.deploy.SparkSubmit",

  "sparkProperties": {

"spark.driver.supervise": "false",

"spark.app.name": "Spark REST API - Bronze Load",

"spark.submit.deployMode": "client",

"spark.master": "spark://spark:7077",

"spark.eventLog.enabled":"true"

  }

}

The CreateSubmissionResponse is successful indicating that a driver is 
successfully submitted, but fails with the following log info, which I am 
failing to see where it indicates the actual error occurring.

> INFO Worker: Asked to launch driver driver-20220118233748-0006

> INFO DriverRunner: Copying user jar 
> file:/opt/bitnami/spark/hostfiles/bronze.py to 
> /opt/bitnami/spark/work/driver-20220118233748-0006/bronze.py

> INFO Utils: Copying /opt/bitnami/spark/hostfiles/bronze.py to 
> /opt/bitnami/spark/work/driver-20220118233748-0006/bronze.py

> INFO DriverRunner: Launch Command: "/opt/bitnami/java/bin/java" "-cp" 
> "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" 
> "-Dspark.eventLog.enabled=true" "-Dspark.app.name=Spark REST API - Bronze 
> Load" "-Dspark.driver.supervise=false" "-Dspark.master=spark://spark:7077" 
> "-Dspark.submit.deployMode=client" 
> "org.apache.spark.deploy.worker.DriverWrapper" 
> "spark://Worker@192.168.32.4:35803" 
> "/opt/bitnami/spark/work/driver-20220118233748-0006/bronze.py" 
> "org.apache.spark.deploy.SparkSubmit" "/opt/bitnami/spark/hostfiles/bronze.py 
> --filename 'filename.json'"

> WARN Worker: Driver driver-20220118233748-0006 exited with failure

Is there something obvious I am missing?  I originally considered trying to use 
airflow, but was told that was overkill and to find another, simpler mechanism 
for triggering our spark jobs (from outside of spark).  Would be more 
straightforward to launch a spark worker image via kubernetes and pass the 
filename argument?

Thanks,
Mike



This electronic message may contain information that is Proprietary, 
Confidential, or legally privileged or protected. It is intended only for the 
use of the individual(s) and entity named in the message. If you are not an 
intended recipient of this message, please notify the sender immediately and 
delete the material from your computer. Do not deliver, distribute or copy this 
message and do not disclose its contents or take any action in reliance on the 
information it contains. Thank You.


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
I am aware of that, but whenever the chunks of code are returned to Spark
from Hadoop (after processing) could they be done not in the ordered way ?
could this ever happen ?

Le lun. 24 janv. 2022 à 16:14, Sean Owen  a écrit :

> Hadoop does not run Spark programs, Spark does. How or why would
> something, what, modify the byte code? No
>
> On Mon, Jan 24, 2022, 9:07 AM sam smith 
> wrote:
>
>> My point is could Hadoop go wrong about one Spark execution ? meaning
>> that it gets confused (given the concurrent distributed tasks) and then
>> adds wrong instruction to the program, or maybe does execute an instruction
>> not at its right order (shuffling the order of execution by executing
>> previous ones, while it shouldn't) ? Before finishing and returning the
>> results from one node it returns the results of the other in a wrong way
>> for example.
>>
>> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>>
>>> Not clear what you mean here. A Spark program is a program, so what are
>>> the alternatives here? program execution order is still program execution
>>> order. You are not guaranteed anything about order of concurrent tasks.
>>> Failed tasks can be reexecuted so should be idempotent. I think the answer
>>> is 'no' but not sure what you are thinking of here.
>>>
>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
>>> wrote:
>>>
 Hello guys,

 I hope my question does not sound weird, but could a Spark execution on
 Hadoop cluster give different output than the program actually does ? I
 mean by that, the execution order is messed by hadoop, or an instruction
 executed twice..; ?

 Thanks for your enlightenment

>>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread Sean Owen
Hadoop does not run Spark programs, Spark does. How or why would something,
what, modify the byte code? No

On Mon, Jan 24, 2022, 9:07 AM sam smith  wrote:

> My point is could Hadoop go wrong about one Spark execution ? meaning that
> it gets confused (given the concurrent distributed tasks) and then adds
> wrong instruction to the program, or maybe does execute an instruction not
> at its right order (shuffling the order of execution by executing previous
> ones, while it shouldn't) ? Before finishing and returning the results from
> one node it returns the results of the other in a wrong way for example.
>
> Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :
>
>> Not clear what you mean here. A Spark program is a program, so what are
>> the alternatives here? program execution order is still program execution
>> order. You are not guaranteed anything about order of concurrent tasks.
>> Failed tasks can be reexecuted so should be idempotent. I think the answer
>> is 'no' but not sure what you are thinking of here.
>>
>> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
>> wrote:
>>
>>> Hello guys,
>>>
>>> I hope my question does not sound weird, but could a Spark execution on
>>> Hadoop cluster give different output than the program actually does ? I
>>> mean by that, the execution order is messed by hadoop, or an instruction
>>> executed twice..; ?
>>>
>>> Thanks for your enlightenment
>>>
>>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
My point is could Hadoop go wrong about one Spark execution ? meaning that
it gets confused (given the concurrent distributed tasks) and then adds
wrong instruction to the program, or maybe does execute an instruction not
at its right order (shuffling the order of execution by executing previous
ones, while it shouldn't) ? Before finishing and returning the results from
one node it returns the results of the other in a wrong way for example.

Le lun. 24 janv. 2022 à 15:31, Sean Owen  a écrit :

> Not clear what you mean here. A Spark program is a program, so what are
> the alternatives here? program execution order is still program execution
> order. You are not guaranteed anything about order of concurrent tasks.
> Failed tasks can be reexecuted so should be idempotent. I think the answer
> is 'no' but not sure what you are thinking of here.
>
> On Mon, Jan 24, 2022 at 7:10 AM sam smith 
> wrote:
>
>> Hello guys,
>>
>> I hope my question does not sound weird, but could a Spark execution on
>> Hadoop cluster give different output than the program actually does ? I
>> mean by that, the execution order is messed by hadoop, or an instruction
>> executed twice..; ?
>>
>> Thanks for your enlightenment
>>
>


Re: Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread Sean Owen
Not clear what you mean here. A Spark program is a program, so what are the
alternatives here? program execution order is still program execution
order. You are not guaranteed anything about order of concurrent tasks.
Failed tasks can be reexecuted so should be idempotent. I think the answer
is 'no' but not sure what you are thinking of here.

On Mon, Jan 24, 2022 at 7:10 AM sam smith 
wrote:

> Hello guys,
>
> I hope my question does not sound weird, but could a Spark execution on
> Hadoop cluster give different output than the program actually does ? I
> mean by that, the execution order is messed by hadoop, or an instruction
> executed twice..; ?
>
> Thanks for your enlightenment
>


Spark execution on Hadoop cluster (many nodes)

2022-01-24 Thread sam smith
Hello guys,

I hope my question does not sound weird, but could a Spark execution on
Hadoop cluster give different output than the program actually does ? I
mean by that, the execution order is messed by hadoop, or an instruction
executed twice..; ?

Thanks for your enlightenment


Re: may I need a join here?

2022-01-24 Thread Gary Liu
You can use left anti join instead. isin accept a list type, not a column
type.

On Mon, Jan 24, 2022 at 01:38 Bitfox  wrote:

> >>> df.show(3)
>
> ++-+
>
> |word|count|
>
> ++-+
>
> |  on|1|
>
> | dec|1|
>
> |2020|1|
>
> ++-+
>
> only showing top 3 rows
>
>
> >>> df2.show(3)
>
> ++-+
>
> |stopword|count|
>
> ++-+
>
> |able|1|
>
> |   about|1|
>
> |   above|1|
>
> ++-+
>
> only showing top 3 rows
>
>
> >>> df3=df.filter(~col("word").isin(df2.stopword ))
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File "/opt/spark/python/pyspark/sql/dataframe.py", line 1733, in filter
>
> jdf = self._jdf.filter(condition._jc)
>
>   File
> "/opt/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line
> 1310, in __call__
>
>   File "/opt/spark/python/pyspark/sql/utils.py", line 117, in deco
>
> raise converted from None
>
> pyspark.sql.utils.AnalysisException: Resolved attribute(s) stopword#4
> missing from word#0,count#1L in operator !Filter NOT word#0 IN
> (stopword#4).;
>
> !Filter NOT word#0 IN (stopword#4)
>
> +- LogicalRDD [word#0, count#1L], false
>
>
>
>
>
> The filter method doesn't work here.
>
> Maybe I need a join for two DF?
>
> What's the syntax for this?
>
>
>
> Thank you and regards,
>
> Bitfox
>
-- 
Gary Liu


Re: What are your experiences using google cloud platform

2022-01-24 Thread Mich Talebzadeh
Dataproc works fine. The current version is Spark 3.1.2. Look at your
code,  hardware and scaling.


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 23 Jan 2022 at 21:19, Andrew Davidson 
wrote:

> Hi recently started using GCP dataproc spark.
>
>
>
> Seem to have trouble getting big jobs to complete. I am using check
> points. I am wondering if maybe I should look for another cloud solution
>
>
>
> Kind regards
>
>
>
> Andy
>


Re: What happens when a partition that holds data under a task fails

2022-01-24 Thread Mich Talebzadeh
Hm,

I don't see what partition failure means here.

You can have a node or executor failure etc. So let us look at a scenario
here irrespective of being a streaming or micro-batch

Spark replicates the partitions among multiple nodes. *If one executor
fails*, it moves the processing over to the other executor. However, if the
data is lost, it re-executes the processing that generated the data, and it
will have to go back to the source. *I**n case of failure, there will be
delay in getting results*. The amount of delay depends on how much
reprocessing Spark has to do.


- Driver executing an action


When the driver executes an action, it submits a job to the Cluster
Manager. The Cluster Manager starts submitting tasks to executors and
monitoring them. In case, if an executor dies, the Cluster Manager does the
work of reassigning the tasks.


- Scaling


Spark does not add executors when executors fail. It just moves the tasks
to other executors. If you are installing  Spark on your own cluster, you
will need to figure out how to bring back executors. For example Spark on
Kubernete

s will replace the failed nodes. However, if the driver dies, the Spark job
dies and there is no recovery from that. The only way to recover is to run
the job again. Batch jobs do not have benchmarking, so, they will need to
reprocess everything from the beginning and be idempotent. Streaming jobs
have benchmarking (write intermediate progress to persistent storage like a
directory etc) and they will start from the last microbatch. This means
that they might have to repeat the last microbatch.



- What to run in case of task(s) failure


RDD lineage has all the history of what is run.  In a multi-stage job, it
may have to rerun all the stages again. For example, if you have done a
groupBy, you will have 2 stages. After the first stage, the data will be
shuffled by hashing the groupBy key , so that data for the same value of
key lands in the same partition. Now, if one of those partitions is lost
during

execution of the second stage, Spark will have to go back and re-execute
all the tasks in the first stage.


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 21 Jan 2022 at 17:54, Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> Hello team,
>
> I am aware that in case of memory issues when a task fails, it will try to
> restart 4 times since it is a default number and if it still fails then it
> will cause the entire job to fail.
>
> But suppose if I am reading a file that is distributed across nodes in
> partitions. So, what will happen if a partition fails that holds some data?
> Will it re-read the entire file and get that specific subset of data since
> the driver has the complete information? or will it copy the data to the
> other working nodes or tasks and try to run it?
>