Does explode lead to more usage of memory

2020-01-18 Thread V0lleyBallJunki3
I am using a dataframe and has structure like this :

root
 |-- orders: array (nullable = true)
 ||-- element: struct (containsNull = true) 
 |||-- amount: double (nullable = true)
 |||-- id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- language: string (nullable = true)

Each user has multiple orders. Now if I explode orders like this:

df.select($"user", explode($"orders").as("order")) . Each order element will
become a row with a duplicated user and language. Was wondering if spark
actually converts each order element into a single row in memory or it just
logical. Because if a single user has 1000 orders  then wouldn't it lead to
a lot more memory consumption since it is duplicating user and language a
1000 times (once for each order) in memory?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can reduced parallelism lead to no shuffle spill?

2019-11-07 Thread V0lleyBallJunki3
I am just using the above example to understand how Spark handles partitions



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Can reduced parallelism lead to no shuffle spill?

2019-11-07 Thread V0lleyBallJunki3
Consider an example where I have a cluster with 5 nodes and each node has 64
cores with 244 GB memory. I decide to run 3 executors on each node and set
executor-cores to 21 and executor memory of 80GB, so that each executor can
execute 21 tasks in parallel.  Now consider that 315(63 * 5) partitions of
data, out of which 314 partitions are of size 3GB but one of them is
30GB(due to data skew). All of the executors that received the 3GB
partitions have 63GB(21 * 3 = since each executor can run 21 tasks in
parallel and each task takes 3GB of memory space) occupied. But the one
executor that received the 30GB partition will need 90GB(20 * 3 + 30)
memory. So will this executor first execute the 20 tasks of 3GB and then
load 30GB task or will it just try to load 21 tasks and find that for one
task it has to spill to disk? If I set executor-cores to just 15 then the
executor that receives the 30 GB partition will only need 14 * 3 + 30 = 72
gb and hence won't spill to disk. So in this case will reduced parallelism
lead to no shuffle spill?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark.sql.autoBroadcastJoinThreshold not taking effect

2019-05-10 Thread V0lleyBallJunki3
Hello,
   I have set spark.sql.autoBroadcastJoinThreshold=1GB and I am running the
spark job. However, my application is failing with:

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: org.apache.spark.SparkException: Cannot broadcast the table that
is larger than 8GB: 8 GB
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:103)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

When I am running with a limit 1 GB how can I hit the 8 GB limit? I made
sure in the Spark History Server as well by printing out the value of
spark.sql.autoBroadcastJoinThreshold that the value is correctly set and
explain plan also shows that it is trying to do a Broadcast Join. Any ideas? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark not doing a broadcast join inspite of the table being well below spark.sql.autoBroadcastJoinThreshold

2019-05-10 Thread V0lleyBallJunki3
So what I discovered was that if I write the table being joined to the disk
and then read it again Spark correctly broadcasts it. I think it is because
when Spark estimates the size of smaller table it estimates it incorrectly
to be much bigger that what it is and hence decides to do a SortMergeJoin on
it. Writing it to the disk and then reading it back again gives Spark the
correct size and hence it then goes ahead and does a Broadcast join.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark not doing a broadcast join inspite of the table being well below spark.sql.autoBroadcastJoinThreshold

2019-05-09 Thread V0lleyBallJunki3
I have a small table well below 50 MB that I want to broadcast join with a
larger table. However, if I set spark.sql.autoBroadcastJoinThreshold to 100
MB spark still decides to do a SortMergeJoin instead of a broadcast join. I
have to set an explicit broadcast hint on the table for it to do the
broadcast join but I don't want to do that because the smaller table might
be bigger than 100 MB in which case I want it to fall back to SortMergeJoin.
Are there any other properties that I need to set?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best notebook for developing for apache spark using scala on Amazon EMR Cluster

2019-04-30 Thread V0lleyBallJunki3
Hello. I am using Zeppelin on Amazon EMR cluster while developing Apache
Spark programs in Scala. The problem is that once that cluster is destroyed
I lose all the notebooks on it. So over a period of time I have a lot of
notebooks that require to be manually  exported into my local disk and from
there imported to each new EMR cluster I create. Is there a notebook
repository or tool that I can use where I can keep all my notebooks in a
folder and access them even on new emr clusters. I know Jupyter is there but
it doesn't support auto-complete for Scala.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to broadcast a very large variable

2019-04-11 Thread V0lleyBallJunki3
I am not using pyspark. The job is written in Scala



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to broadcast a very large variable

2019-04-10 Thread V0lleyBallJunki3
I am using spark.sparkContext.broadcast() to broadcast. Is this even true if
the memory on our machines is 244 Gb a 70 Gb variable can't be broadcasted
even with high network speed?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unable to broadcast a very large variable

2019-04-10 Thread V0lleyBallJunki3
Hello,
   I have a 110 node cluster with each executor having 50 GB memory and I
want to broadcast a variable of 70GB with each machine have 244 GB of
memory. I am having difficulty doing that. I was wondering at what size is
it unwise to broadcast a variable. Is there a general rule of thumb?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Any way to see the size of the broadcast variable?

2018-10-09 Thread V0lleyBallJunki3
Yes each of the executors have 60GB



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Any way to see the size of the broadcast variable?

2018-10-09 Thread V0lleyBallJunki3
Hello,
   I have set the value of spark.sql.autoBroadcastJoinThreshold to a very
high value of 20 GB. I am joining a table that I am sure is below this
variable, however spark is doing a SortMergeJoin. If I set a broadcast hint
then spark does a broadcast join and job finishes much faster. However, when
run in production for some large tables, I run into errors. Is there a way
to see the actual size of the table being broadcast? I wrote the table being
broadcast to disk and it took only 32 MB in parquet. I tried to cache this
table in Zeppelin and run a table.count() operation but nothing gets shown
on on the Storage tab of the Spark History Server. spark.util.SizeEstimator
doesn't seem to be giving accurate numbers for this table either. Any way to
figure out the size of this table being broadcast?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Set can be passed in as an input argument but not as output

2018-09-03 Thread V0lleyBallJunki3
I find that if the input is a Set then Spark doesn't try to find an encoder
for the Set but at the same time if the output of a method is a Set it does
try to find an encoder and if not found errors out. My understanding is that
even the input set has to be transferred to the executors?

The first method testMethodSet takes an Array[Set[String]] and returns a
Set[String]
def testMethodSet(info: Row, arrSetString: Array[Set[String]]): Set[String]
= {
  val assignments = info.getAs[Seq[String]](0).toSet
  for (setString <- arrSetString) {
if (setString.subsetOf(assignments)) {
  return setString
}
  }
  Set[String]()
}

Another method takes the same arguments as the first one but returns a
Seq[String]
 def testMethodSeq(info: Row,  arrSetString: Array[Set[String]]):
Seq[String] = {
  val assignments = info.getAs[Seq[String]](0).toSet

  for (setString <- arrSetString) {
if (setString.subsetOf(assignments)) {
  return setString.toSeq
}
  }

  Seq[String]()
}

The testMethodSet method throws an error
testRows.map(s => testMethodSet(s,"test",Array((Set("","")
:20: error: Unable to find encoder for type stored in a Dataset. 
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   testRows.map(s =>
testMethodSet(s,"test",Array((Set("","")

The testMethodSeq method works fine

testRows.map(s => testMethodSeq(s, Array((Set("","")
res12: org.apache.spark.sql.Dataset[Seq[String]] = [value: array]

   



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



java.lang.UnsupportedOperationException: No Encoder found for Set[String]

2018-08-15 Thread V0lleyBallJunki3
Hello,
  I am using Spark 2.2.2 with Scala 2.11.8. I wrote a short program

val spark = SparkSession.builder().master("local[4]").getOrCreate()

case class TestCC(i: Int, ss: Set[String])

import spark.implicits._
import spark.sqlContext.implicits._

val testCCDS = Seq(TestCC(1,Set("SS","Salil")), TestCC(2, Set("xx",
"XYZ"))).toDS()


I get :
java.lang.UnsupportedOperationException: No Encoder found for Set[String]
- field (class: "scala.collection.immutable.Set", name: "ss")
- root class: "TestCC"
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:455)
  at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:455)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:626)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:614)
  at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

To the best of my knowledge implicit support for Set has been added in Spark
2.2. Am I missing something?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org