Does explode lead to more usage of memory
I am using a dataframe and has structure like this : root |-- orders: array (nullable = true) ||-- element: struct (containsNull = true) |||-- amount: double (nullable = true) |||-- id: string (nullable = true) |-- user: string (nullable = true) |-- language: string (nullable = true) Each user has multiple orders. Now if I explode orders like this: df.select($"user", explode($"orders").as("order")) . Each order element will become a row with a duplicated user and language. Was wondering if spark actually converts each order element into a single row in memory or it just logical. Because if a single user has 1000 orders then wouldn't it lead to a lot more memory consumption since it is duplicating user and language a 1000 times (once for each order) in memory? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Can reduced parallelism lead to no shuffle spill?
I am just using the above example to understand how Spark handles partitions -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Can reduced parallelism lead to no shuffle spill?
Consider an example where I have a cluster with 5 nodes and each node has 64 cores with 244 GB memory. I decide to run 3 executors on each node and set executor-cores to 21 and executor memory of 80GB, so that each executor can execute 21 tasks in parallel. Now consider that 315(63 * 5) partitions of data, out of which 314 partitions are of size 3GB but one of them is 30GB(due to data skew). All of the executors that received the 3GB partitions have 63GB(21 * 3 = since each executor can run 21 tasks in parallel and each task takes 3GB of memory space) occupied. But the one executor that received the 30GB partition will need 90GB(20 * 3 + 30) memory. So will this executor first execute the 20 tasks of 3GB and then load 30GB task or will it just try to load 21 tasks and find that for one task it has to spill to disk? If I set executor-cores to just 15 then the executor that receives the 30 GB partition will only need 14 * 3 + 30 = 72 gb and hence won't spill to disk. So in this case will reduced parallelism lead to no shuffle spill? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark.sql.autoBroadcastJoinThreshold not taking effect
Hello, I have set spark.sql.autoBroadcastJoinThreshold=1GB and I am running the spark job. However, my application is failing with: at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) Caused by: org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:103) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) When I am running with a limit 1 GB how can I hit the 8 GB limit? I made sure in the Spark History Server as well by printing out the value of spark.sql.autoBroadcastJoinThreshold that the value is correctly set and explain plan also shows that it is trying to do a Broadcast Join. Any ideas? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark not doing a broadcast join inspite of the table being well below spark.sql.autoBroadcastJoinThreshold
So what I discovered was that if I write the table being joined to the disk and then read it again Spark correctly broadcasts it. I think it is because when Spark estimates the size of smaller table it estimates it incorrectly to be much bigger that what it is and hence decides to do a SortMergeJoin on it. Writing it to the disk and then reading it back again gives Spark the correct size and hence it then goes ahead and does a Broadcast join. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark not doing a broadcast join inspite of the table being well below spark.sql.autoBroadcastJoinThreshold
I have a small table well below 50 MB that I want to broadcast join with a larger table. However, if I set spark.sql.autoBroadcastJoinThreshold to 100 MB spark still decides to do a SortMergeJoin instead of a broadcast join. I have to set an explicit broadcast hint on the table for it to do the broadcast join but I don't want to do that because the smaller table might be bigger than 100 MB in which case I want it to fall back to SortMergeJoin. Are there any other properties that I need to set? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Best notebook for developing for apache spark using scala on Amazon EMR Cluster
Hello. I am using Zeppelin on Amazon EMR cluster while developing Apache Spark programs in Scala. The problem is that once that cluster is destroyed I lose all the notebooks on it. So over a period of time I have a lot of notebooks that require to be manually exported into my local disk and from there imported to each new EMR cluster I create. Is there a notebook repository or tool that I can use where I can keep all my notebooks in a folder and access them even on new emr clusters. I know Jupyter is there but it doesn't support auto-complete for Scala. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unable to broadcast a very large variable
I am not using pyspark. The job is written in Scala -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unable to broadcast a very large variable
I am using spark.sparkContext.broadcast() to broadcast. Is this even true if the memory on our machines is 244 Gb a 70 Gb variable can't be broadcasted even with high network speed? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unable to broadcast a very large variable
Hello, I have a 110 node cluster with each executor having 50 GB memory and I want to broadcast a variable of 70GB with each machine have 244 GB of memory. I am having difficulty doing that. I was wondering at what size is it unwise to broadcast a variable. Is there a general rule of thumb? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Any way to see the size of the broadcast variable?
Yes each of the executors have 60GB -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Any way to see the size of the broadcast variable?
Hello, I have set the value of spark.sql.autoBroadcastJoinThreshold to a very high value of 20 GB. I am joining a table that I am sure is below this variable, however spark is doing a SortMergeJoin. If I set a broadcast hint then spark does a broadcast join and job finishes much faster. However, when run in production for some large tables, I run into errors. Is there a way to see the actual size of the table being broadcast? I wrote the table being broadcast to disk and it took only 32 MB in parquet. I tried to cache this table in Zeppelin and run a table.count() operation but nothing gets shown on on the Storage tab of the Spark History Server. spark.util.SizeEstimator doesn't seem to be giving accurate numbers for this table either. Any way to figure out the size of this table being broadcast? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Set can be passed in as an input argument but not as output
I find that if the input is a Set then Spark doesn't try to find an encoder for the Set but at the same time if the output of a method is a Set it does try to find an encoder and if not found errors out. My understanding is that even the input set has to be transferred to the executors? The first method testMethodSet takes an Array[Set[String]] and returns a Set[String] def testMethodSet(info: Row, arrSetString: Array[Set[String]]): Set[String] = { val assignments = info.getAs[Seq[String]](0).toSet for (setString <- arrSetString) { if (setString.subsetOf(assignments)) { return setString } } Set[String]() } Another method takes the same arguments as the first one but returns a Seq[String] def testMethodSeq(info: Row, arrSetString: Array[Set[String]]): Seq[String] = { val assignments = info.getAs[Seq[String]](0).toSet for (setString <- arrSetString) { if (setString.subsetOf(assignments)) { return setString.toSeq } } Seq[String]() } The testMethodSet method throws an error testRows.map(s => testMethodSet(s,"test",Array((Set("","") :20: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. testRows.map(s => testMethodSet(s,"test",Array((Set("","") The testMethodSeq method works fine testRows.map(s => testMethodSeq(s, Array((Set("","") res12: org.apache.spark.sql.Dataset[Seq[String]] = [value: array] -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
java.lang.UnsupportedOperationException: No Encoder found for Set[String]
Hello, I am using Spark 2.2.2 with Scala 2.11.8. I wrote a short program val spark = SparkSession.builder().master("local[4]").getOrCreate() case class TestCC(i: Int, ss: Set[String]) import spark.implicits._ import spark.sqlContext.implicits._ val testCCDS = Seq(TestCC(1,Set("SS","Salil")), TestCC(2, Set("xx", "XYZ"))).toDS() I get : java.lang.UnsupportedOperationException: No Encoder found for Set[String] - field (class: "scala.collection.immutable.Set", name: "ss") - root class: "TestCC" at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:455) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:455) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:626) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:614) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) To the best of my knowledge implicit support for Set has been added in Spark 2.2. Am I missing something? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org