Re: An alternative logic to collaborative filtering works fine but we are facing run time issues in executing the job

2019-04-16 Thread Ankit Khettry
Hi Balakumar

Two things.

One - It seems like your cluster is running out of memory and then
eventually out of disc , likely while materializing the dataframe to write
(what's the volume of data created by the join?)

Two - Your job is running in local mode, and is able to utilize just the
master node resources.

Try running the job in yarn mode and if the issue persists, try increasing
the disc volumes.

Best Regards
Ankit Khettry

On Wed, 17 Apr, 2019, 9:44 AM Balakumar iyer S, 
wrote:

> Hi ,
>
>
> While running the following spark code in the cluster with following
> configuration it is spread into  3 job Id's
>
> CLUSTER CONFIGURATION
>
> 3 NODE CLUSTER
>
> NODE 1 - 64GB 16CORES
>
> NODE 2 - 64GB 16CORES
>
> NODE 3 - 64GB 16CORES
>
>
> At Job Id 2 job is stuck at the stage 51 of 254 and then it starts
> utilising the disk space I am not sure why is this happening and my work is
> completely ruined . could someone help me on this
>
> I have attached screen shot of spark stages which are stuck for reference
>
> Please let me know for more questions with the setup and code
> Thanks
>
>
>
> code:
>
>def main(args: Array[String]) {
>
> Logger.getLogger("org").setLevel(Level.ERROR)
>
> val ss = SparkSession
>
>   .builder
>
>   .appName("join_association").master("local[*]")
>
>   .getOrCreate()
>
>   import ss.implicits._
>
>  val dframe = ss.read.option("inferSchema",
> value=true).option("delimiter", ",").csv("in/matrimony.txt")
>
>  dframe.show()
>
>  dframe.printSchema()
>
>  //left_frame
>
>
>
>  val dfLeft = dframe.withColumnRenamed("_c1", "left_data")
>
>
>
>  val dfRight = dframe.withColumnRenamed("_c1", "right_data")
>
>
>
>  //Join
>
>
>
>  val joined = dfLeft.join(dfRight , dfLeft.col("_c0") ===
> dfRight.col("_c0") ).filter(col("left_data") !== col("right_data") )
>
>
>
>   joined.show()
>
>
>
> val result = joined.select(col("left_data"), col("right_data") as
> "similar_ids" )
>
>
>
> result.write.csv("/output")
>
> ss.stop()
>
>
>
>   }
>
>
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


An alternative logic to collaborative filtering works fine but we are facing run time issues in executing the job

2019-04-16 Thread Balakumar iyer S
Hi ,


While running the following spark code in the cluster with following
configuration it is spread into  3 job Id's

CLUSTER CONFIGURATION

3 NODE CLUSTER

NODE 1 - 64GB 16CORES

NODE 2 - 64GB 16CORES

NODE 3 - 64GB 16CORES


At Job Id 2 job is stuck at the stage 51 of 254 and then it starts
utilising the disk space I am not sure why is this happening and my work is
completely ruined . could someone help me on this

I have attached screen shot of spark stages which are stuck for reference

Please let me know for more questions with the setup and code
Thanks



code:

   def main(args: Array[String]) {

Logger.getLogger("org").setLevel(Level.ERROR)

val ss = SparkSession

  .builder

  .appName("join_association").master("local[*]")

  .getOrCreate()

  import ss.implicits._

 val dframe = ss.read.option("inferSchema",
value=true).option("delimiter", ",").csv("in/matrimony.txt")

 dframe.show()

 dframe.printSchema()

 //left_frame



 val dfLeft = dframe.withColumnRenamed("_c1", "left_data")



 val dfRight = dframe.withColumnRenamed("_c1", "right_data")



 //Join



 val joined = dfLeft.join(dfRight , dfLeft.col("_c0") ===
dfRight.col("_c0") ).filter(col("left_data") !== col("right_data") )



  joined.show()



val result = joined.select(col("left_data"), col("right_data") as
"similar_ids" )



result.write.csv("/output")

ss.stop()



  }



-- 
REGARDS
BALAKUMAR SEETHARAMAN

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: [External Sender] How to use same SparkSession in another app?

2019-04-16 Thread Femi Anthony
Why not save the data frame to persistent storage s3/HDFS in the first
application and read it back in the 2nd ?

On Tue, Apr 16, 2019 at 8:58 PM Rishikesh Gawade 
wrote:

> Hi.
> I wish to use a SparkSession created by one app in another app so that i
> can use the dataframes belonging to that session. Is it possible to use the
> same sparkSession in another app?
> Thanks,
> Rishikesh
>
-- 
Card Machine Learning (ML) Team, Capital One


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to use same SparkSession in another app?

2019-04-16 Thread Jacek Laskowski
Hi,

Not possible. What are you really trying to do? Why do you need to share
dataframes? They're nothing but metadata of a distributed computation (no
data inside) so what would be the purpose of such sharing?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Tue, Apr 16, 2019 at 1:57 PM Rishikesh Gawade 
wrote:

> Hi.
> I wish to use a SparkSession created by one app in another app so that i
> can use the dataframes belonging to that session. Is it possible to use the
> same sparkSession in another app?
> Thanks,
> Rishikesh
>


Re: Reading RDD by (key, data) from s3

2019-04-16 Thread yujhe.li
You can't, sparkcontext is a singleton object. You have to use hadoop library
or aws client to read files on s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Dynamic executor scaling spark/Kubernetes

2019-04-16 Thread purna pradeep
Hello,

Is Kubernetes Dynamic executor scaling for spark  is available in latest
release of spark

I mean scaling the executors based on the work load vs preallocating number
of executors for a spark job

Thanks,
Purna


How to use same SparkSession in another app?

2019-04-16 Thread Rishikesh Gawade
Hi.
I wish to use a SparkSession created by one app in another app so that i
can use the dataframes belonging to that session. Is it possible to use the
same sparkSession in another app?
Thanks,
Rishikesh


Reading RDD by (key, data) from s3

2019-04-16 Thread Gorka Bravo Martinez
Hi,

I am trying to read gzipped json data from s3, my idea would be to do =>

data = (s3_keys
.mapValues(lambda x: x, s3_read_data(x)
)

for that I though about using sc.textFile instead of s3_read_data, but wouldn't 
work. Any idea how to achieve a solution in here?

Cheers, Gorka.


K8s-Spark client mode : Executor image not able to download application jar from driver

2019-04-16 Thread Nikhil Chinnapa
Environment:
Spark: 2.4.0
Kubernetes:1.14

Query: Does application jar needs to be part of both Driver and Executor
image?

Invocation point (from Java code):
sparkLaunch = new SparkLauncher()

.setMaster(LINUX_MASTER)
.setAppResource(LINUX_APP_RESOURCE)
.setConf("spark.app.name",APP_NAME)
.setMainClass(MAIN_CLASS)

.setConf("spark.executor.instances",EXECUTOR_COUNT)

.setConf("spark.kubernetes.container.image",CONTAINER_IMAGE)

.setConf("spark.kubernetes.driver.pod.name",DRIVER_POD_NAME)
   
.setConf("spark.kubernetes.container.image.pullSecrets",REGISTRY_SECRET)
   
.setConf("spark.kubernetes.authenticate.driver.serviceAccountName",SERVICE_ACCOUNT_NAME)
.setConf("spark.driver.host", SERVICE_NAME + 
"." + NAMESPACE +
".svc.cluster.local")
.setConf("spark.driver.port", 
DRIVER_PORT)
.setDeployMode("client")
;

Scenario:
I am trying to run Spark on K8s in client mode. When I put application jar
image both in driver and executor then program work fines.

But, if I put application jar in driver image only then I get following
error:

2019-04-16 06:36:44 INFO  Executor:54 - Fetching
file:/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar with timestamp
1555396592768
2019-04-16 06:36:44 INFO  Utils:54 - Copying
/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar to
/var/data/spark-d24c8fbc-4fe7-4968-9310-f891a097d1e7/spark-31ba5cbb-3132-408c-991a-795
2019-04-16 06:36:44 ERROR Executor:91 - Exception in task 0.1 in stage 0.0
(TID 2)
java.nio.file.NoSuchFileException:
/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar
at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:548)
at
java.base/sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:254)
at java.base/java.nio.file.Files.copy(Files.java:1294)
at
org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:664)
at org.apache.spark.util.Utils$.copyFile(Utils.scala:635)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:805)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:797)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:797)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:369)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org