Delete checkpointed data for a single dataset?
Hello, In a non streaming application, I am using the checkpoint feature to truncate the lineage of complex datasets. At the end of the job, the checkpointed data, which is stored in HDFS, is deleted. I am looking for a way to delete the unused checkpointed data earlier than the end of the job. If I know that one dataset won't be used anymore, is there a way to delete its checkpointed data in the middle of the application? Thank you, Isabelle
Read ORC file with subset of schema
Hello, When reading an older ORC file where the schema is a subset of the current schema, reader throws an error. Please see sample code below (ran on spark 2.1). The same commands on a parquet file do not error out, they return the new column with null values. Is there a setting to add to the reader for ORC file to achieve the same behavior? Thanks, Isabelle scala> case class Schema1(id: String) defined class Schema1 scala> case class Schema2(id: String, new_column: String) defined class Schema2 scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> spark.read.schema(Encoders.product[Schema1].schema).format("orc").load("path_to_orc_file").show +--+ |id| +--+ | 9930481| | 119474983| | 4665129| | 27531864| | 41562099| | 110502758| +--+ scala> spark.read.schema(Encoders.product[Schema2].schema).format("orc").load("path_to_orc_file").show 19/08/31 03:26:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-10-x-y-z.ap-northeast-2.compute.internal, executor 1): java.lang.IllegalArgumentException: Field "new_column" does not exist. at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:290) at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:290) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:289) at org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$6.apply(OrcFileFormat.scala:308) at org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$6.apply(OrcFileFormat.scala:308) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at org.apache.spark.sql.types.StructType.map(StructType.scala:96) at org.apache.spark.sql.hive.orc.OrcRelation$.setRequiredColumns(OrcFileFormat.scala:308) at org.apache.spark.sql.hive.orc.OrcFileFormat$$anonfun$buildReader$2.apply(OrcFileFormat.scala:140) at org.apache.spark.sql.hive.orc.OrcFileFormat$$anonfun$buildReader$2.apply(OrcFileFormat.scala:129) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) //Instead of error, expected following +--+--+ |id|new_column| +--+--+ | 9930481| null| | 119474983| null| | 4665129| null| | 4665129| null| | 27531864| null| | 41562099| null| | 41562099| null| | 110502758| null| +--+--+
ClassCastException when using SparkSQL Window function
Hello, I have a simple session table, which tracks pages users visited with a sessionId. I would like to apply a window function by sessionId, but am hitting a type cast exception. I am using Spark 1.5.0. Here is sample code: scala> df.printSchema root |-- sessionid: string (nullable = true) |-- ip: string (nullable = true) |-- user_id: string (nullable = true) |-- timestamp: string (nullable = true) |-- page: string (nullable = true) scala> df.withColumn("num", rowNumber.over(Window.partitionBy("sessionid"))).show(10) Here is the error stacktrace: Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Has anyone encountered this problem before? Any pointers would be greatly appreciated. Thanks! Isabelle
Re: DataFrame creation delay?
Hi Harsh, Thanks a lot for your reply. I added a predicate to my query to select a single partition in the table, and tested with both "spark.sql.hive.metastorePartitionPruning" setting on and off, and there is no difference in DataFrame creation time. Yes, Michael's proposed workaround works. But I was under the impression that this workaround was only for Spark version < 1.5. With the Hive metastore partition pruning feature from Spark 1.5, I thought there would be no more delay, so I could create DataFrames left and right. I noticed that regardless of the setting, when I create a DataFrame with or without a predicate, I get a log message from HadoopFsRelation class which lists hdfs filepaths to ALL partitions in my table (see logInfo call in the code <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L770>). Is this expected? I am not sure who is creating this Array of filepaths, but I am guessing this is the source of my delay. Thanks, Isabelle On Thu, Dec 10, 2015 at 7:38 PM, Harsh J <ha...@cloudera.com> wrote: > The option of "spark.sql.hive.metastorePartitionPruning=true" will not > work unless you have a partition column predicate in your query. Your query > of "select * from temp.log" does not do this. The slowdown appears to be > due to the need of loading all partition metadata. > > Have you also tried to see if Michael's temp-table suggestion helps you > cache the expensive partition lookup? (re-quoted below) > > """ > If you run sqlContext.table("...").registerTempTable("...") that temptable > will cache the lookup of partitions [the first time is slow, but subsequent > lookups will be faster]. > """ - X-Ref: Permalink > <https://issues.apache.org/jira/browse/SPARK-6910?focusedCommentId=14529666=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14529666> > > Also, do you absolutely need to use "select * from temp.log"? Adding a > where clause to the query with a partition condition will help Spark prune > the request to just the required partitions (vs. all, which is proving > expensive). > > On Fri, Dec 11, 2015 at 3:59 AM Isabelle Phan <nlip...@gmail.com> wrote: > >> Hi Michael, >> >> We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are >> on cloudera), and Parquet formatted tables. I turned on spark >> .sql.hive.metastorePartitionPruning=true, but DataFrame creation still >> takes a long time. >> Is there any other configuration to consider? >> >> >> Thanks a lot for your help, >> >> Isabelle >> >> On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> If you run sqlContext.table("...").registerTempTable("...") that >>> temptable will cache the lookup of partitions. >>> >>> On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan <nlip...@gmail.com> wrote: >>> >>>> Hi Michael, >>>> >>>> Thanks a lot for your reply. >>>> >>>> This table is stored as text file with tab delimited columns. >>>> >>>> You are correct, the problem is because my table has too many >>>> partitions (1825 in total). Since I am on Spark 1.4, I think I am hitting >>>> bug 6984 <https://issues.apache.org/jira/browse/SPARK-6984>. >>>> >>>> Not sure when my company can move to 1.5. Would you know some >>>> workaround for this bug? >>>> If I cannot find workaround for this, will have to change our schema >>>> design to reduce number of partitions. >>>> >>>> >>>> Thanks, >>>> >>>> Isabelle >>>> >>>> >>>> >>>> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> Also, do you mean two partitions or two partition columns? If there >>>>> are many partitions it can be much slower. In Spark 1.5 I'd consider >>>>> setting spark.sql.hive.metastorePartitionPruning=true if you have >>>>> predicates over the partition columns. >>>>> >>>>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust < >>>>> mich...@databricks.com> wrote: >>>>> >>>>>> What format is this table. For parquet and other optimized formats >>>>>> we cache a bunch of file metadata on first access to make interactive >>>>>> queries faster. &g
Re: DataFrame creation delay?
Hi Michael, We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are on cloudera), and Parquet formatted tables. I turned on spark .sql.hive.metastorePartitionPruning=true, but DataFrame creation still takes a long time. Is there any other configuration to consider? Thanks a lot for your help, Isabelle On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust <mich...@databricks.com> wrote: > If you run sqlContext.table("...").registerTempTable("...") that > temptable will cache the lookup of partitions. > > On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan <nlip...@gmail.com> wrote: > >> Hi Michael, >> >> Thanks a lot for your reply. >> >> This table is stored as text file with tab delimited columns. >> >> You are correct, the problem is because my table has too many partitions >> (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984 >> <https://issues.apache.org/jira/browse/SPARK-6984>. >> >> Not sure when my company can move to 1.5. Would you know some workaround >> for this bug? >> If I cannot find workaround for this, will have to change our schema >> design to reduce number of partitions. >> >> >> Thanks, >> >> Isabelle >> >> >> >> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <mich...@databricks.com >> > wrote: >> >>> Also, do you mean two partitions or two partition columns? If there are >>> many partitions it can be much slower. In Spark 1.5 I'd consider setting >>> spark.sql.hive.metastorePartitionPruning=true >>> if you have predicates over the partition columns. >>> >>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> What format is this table. For parquet and other optimized formats we >>>> cache a bunch of file metadata on first access to make interactive queries >>>> faster. >>>> >>>> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am using SparkSQL to query some Hive tables. Most of the time, when >>>>> I create a DataFrame using sqlContext.sql("select * from table") command, >>>>> DataFrame creation is less than 0.5 second. >>>>> But I have this one table with which it takes almost 12 seconds! >>>>> >>>>> scala> val start = scala.compat.Platform.currentTime; val logs = >>>>> sqlContext.sql("select * from temp.log"); val execution = >>>>> scala.compat.Platform.currentTime - start >>>>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from >>>>> temp.log >>>>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed >>>>> start: Long = 1441336022731 >>>>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int, >>>>> log_time: string, tag: string, dt: string, test_id: int] >>>>> execution: Long = *11567* >>>>> >>>>> This table has 3.6 B rows, and 2 partitions (on dt and test_id >>>>> columns). >>>>> I have created DataFrames on even larger tables and do not see such >>>>> delay. >>>>> So my questions are: >>>>> - What can impact DataFrame creation time? >>>>> - Is it related to the table partitions? >>>>> >>>>> >>>>> Thanks much your help! >>>>> >>>>> Isabelle >>>>> >>>> >>>> >>> >> >
Re: SparkSQL API to insert DataFrame into a static partition?
Thanks all for your reply! I tested both approaches: registering the temp table then executing SQL vs. saving to HDFS filepath directly. The problem with the second approach is that I am inserting data into a Hive table, so if I create a new partition with this method, Hive metadata is not updated. So I will be going with first approach. Follow up question in this case: what is the cost of registering a temp table? Is there a limit to the number of temp tables that can be registered by Spark context? Thanks again for your input. Isabelle On Wed, Dec 2, 2015 at 10:30 AM, Michael Armbrust <mich...@databricks.com> wrote: > you might also coalesce to 1 (or some small number) before writing to > avoid creating a lot of files in that partition if you know that there is > not a ton of data. > > On Wed, Dec 2, 2015 at 12:59 AM, Rishi Mishra <rmis...@snappydata.io> > wrote: > >> As long as all your data is being inserted by Spark , hence using the >> same hash partitioner, what Fengdong mentioned should work. >> >> On Wed, Dec 2, 2015 at 9:32 AM, Fengdong Yu <fengdo...@everstring.com> >> wrote: >> >>> Hi >>> you can try: >>> >>> if your table under location “/test/table/“ on HDFS >>> and has partitions: >>> >>> “/test/table/dt=2012” >>> “/test/table/dt=2013” >>> >>> df.write.mode(SaveMode.Append).partitionBy("date”).save(“/test/table") >>> >>> >>> >>> On Dec 2, 2015, at 10:50 AM, Isabelle Phan <nlip...@gmail.com> wrote: >>> >>> df.write.partitionBy("date").insertInto("my_table") >>> >>> >>> >> >> >> -- >> Regards, >> Rishitesh Mishra, >> SnappyData . (http://www.snappydata.io/) >> >> https://in.linkedin.com/in/rishiteshmishra >> > >
How to catch error during Spark job?
Hello, I had a question about error handling in Spark job: if an exception occurs during the job, what is the best way to get notification of the failure? Can Spark jobs return with different exit codes? For example, I wrote a dummy Spark job just throwing out an Exception, as follows: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object ExampleJob { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Test Job") val sc = new SparkContext(conf) try { val count = sc.parallelize(1 to 100).count println(s"Count: $count") *throw new Exception("Fail!")* } finally { sc.stop } } } The spark-submit execution trace shows the error: spark-submit --class com.test.ExampleJob test.jar 15/10/03 03:13:16 INFO SparkContext: Running Spark version 1.4.0 15/10/03 03:13:19 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/10/03 03:13:19 WARN SparkConf: ... 15/10/03 03:13:59 INFO DAGScheduler: Job 0 finished: count at ExampleJob.scala:12, took 18.879104 s Count: 100 15/10/03 03:13:59 INFO SparkUI: Stopped Spark web UI at [] 15/10/03 03:13:59 INFO DAGScheduler: Stopping DAGScheduler 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/10/03 03:13:59 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/10/03 03:13:59 INFO Utils: path = /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596/blockmgr-d8e40805-3b8c-45f4-97b3-b89874158796, already present as root for deletion. 15/10/03 03:13:59 INFO MemoryStore: MemoryStore cleared 15/10/03 03:13:59 INFO BlockManager: BlockManager stopped 15/10/03 03:13:59 INFO BlockManagerMaster: BlockManagerMaster stopped 15/10/03 03:13:59 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/10/03 03:13:59 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" java.lang.Exception: Fail! at com.test.ExampleJob$.main(ExampleJob.scala:14) at com.test.ExampleJob.main(ExampleJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/10/03 03:13:59 INFO Utils: Shutdown hook called 15/10/03 03:13:59 INFO Utils: Deleting directory /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596 15/10/03 03:14:00 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. However, the Spark UI just shows the status as "FINISHED". Is this a configuration error on my side? [image: Inline image 1] Thanks, Isabelle
Re: How to distinguish columns when joining DataFrames with shared parent?
Thanks Michael and Ali for the reply! I'll make sure to use unresolved columns when working with self joins then. As pointed by Ali, isn't there still an issue with the aliasing? It works when using org.apache.spark.sql.functions.col(colName: String) method, but not when using org.apache.spark.sql.DataFrame.apply(colName: String): scala> j.select(col("lv.value")).show +-+ |value| +-+ | 10| | 20| +-+ scala> j.select(largeValues("lv.value")).show +-+ |value| +-+ |1| |5| +-+ Or does this behavior have the same root cause as detailed in Michael's email? -Isabelle On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrustwrote: > Unfortunately, the mechanisms that we use to differentiate columns > automatically don't work particularly well in the presence of self joins. > However, you can get it work if you use the $"column" syntax consistently: > > val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", > "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = > df.filter('value >= 10).as("lv") > > smallValues > .join(largeValues, $"sv.key" === $"lv.key") > .select($"sv.key".as("key"), $"sv.value".as("small_value"), > $"lv.value".as("large_value")) > .withColumn("diff", $"small_value" - $"large_value") > .show() > +---+---+---++|key|small_value|large_value|diff|+---+---+---++| > 1| 1| 10| -9|| 3| 5| 20| > -15|+---+---+---++ > > > The problem with the other cases is that calling smallValues("columnName") > or largeValues("columnName") is eagerly resolving the attribute to the > same column (since the data is actually coming from the same place). By > the time we realize that you are joining the data with itself (at which > point we rewrite one side of the join to use different expression ids) its > too late. At the core the problem is that in Scala we have no easy way to > differentiate largeValues("columnName") from smallValues("columnName"). > This is because the data is coming from the same DataFrame and we don't > actually know which variable name you are using. There are things we can > change here, but its pretty hard to change the semantics without breaking > other use cases. > > So, this isn't a straight forward "bug", but its definitely a usability > issue. For now, my advice would be: only use unresolved columns (i.e. > $"[alias.]column" or col("[alias.]column")) when working with self joins. > > Michael >
Re: How to distinguish columns when joining DataFrames with shared parent?
Ok, got it. Thanks a lot Michael for the detailed reply! On Oct 21, 2015 1:54 PM, "Michael Armbrust" <mich...@databricks.com> wrote: > Yeah, I was suggesting that you avoid using > org.apache.spark.sql.DataFrame.apply(colName: > String) when you are working with selfjoins as it eagerly binds to a > specific column in a what that breaks when we do the rewrite of one side of > the query. Using the apply method constructs a resolved column eagerly > (which looses the alias information). > > On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan <nlip...@gmail.com> wrote: > >> Thanks Michael and Ali for the reply! >> >> I'll make sure to use unresolved columns when working with self joins >> then. >> >> As pointed by Ali, isn't there still an issue with the aliasing? It works >> when using org.apache.spark.sql.functions.col(colName: String) method, but >> not when using org.apache.spark.sql.DataFrame.apply(colName: String): >> >> scala> j.select(col("lv.value")).show >> +-+ >> |value| >> +-+ >> | 10| >> | 20| >> +-+ >> >> >> scala> j.select(largeValues("lv.value")).show >> +-+ >> |value| >> +-+ >> |1| >> |5| >> +-+ >> >> Or does this behavior have the same root cause as detailed in Michael's >> email? >> >> >> -Isabelle >> >> >> >> >> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> Unfortunately, the mechanisms that we use to differentiate columns >>> automatically don't work particularly well in the presence of self joins. >>> However, you can get it work if you use the $"column" syntax >>> consistently: >>> >>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", >>> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = >>> df.filter('value >= 10).as("lv") >>> >>> smallValues >>> .join(largeValues, $"sv.key" === $"lv.key") >>> .select($"sv.key".as("key"), $"sv.value".as("small_value"), >>> $"lv.value".as("large_value")) >>> .withColumn("diff", $"small_value" - $"large_value") >>> .show() >>> +---+---+---++|key|small_value|large_value|diff|+---+---+---++| >>> 1| 1| 10| -9|| 3| 5| 20| >>> -15|+---+---+---++ >>> >>> >>> The problem with the other cases is that calling >>> smallValues("columnName") or largeValues("columnName") is eagerly >>> resolving the attribute to the same column (since the data is actually >>> coming from the same place). By the time we realize that you are joining >>> the data with itself (at which point we rewrite one side of the join to use >>> different expression ids) its too late. At the core the problem is that in >>> Scala we have no easy way to differentiate largeValues("columnName") >>> from smallValues("columnName"). This is because the data is coming >>> from the same DataFrame and we don't actually know which variable name you >>> are using. There are things we can change here, but its pretty hard to >>> change the semantics without breaking other use cases. >>> >>> So, this isn't a straight forward "bug", but its definitely a usability >>> issue. For now, my advice would be: only use unresolved columns (i.e. >>> $"[alias.]column" or col("[alias.]column")) when working with self >>> joins. >>> >>> Michael >>> >> >> >
How to distinguish columns when joining DataFrames with shared parent?
Hello, When joining 2 DataFrames which originate from the same initial DataFrame, why can't org.apache.spark.sql.DataFrame.apply(colName: String) method distinguish which column to read? Let me illustrate this question with a simple example (ran on Spark 1.5.1): //my initial DataFrame scala> df res39: org.apache.spark.sql.DataFrame = [key: int, value: int] scala> df.show +---+-+ |key|value| +---+-+ | 1|1| | 1| 10| | 2|3| | 3| 20| | 3|5| | 4| 10| +---+-+ //2 children DataFrames scala> val smallValues = df.filter('value < 10) smallValues: org.apache.spark.sql.DataFrame = [key: int, value: int] scala> smallValues.show +---+-+ |key|value| +---+-+ | 1|1| | 2|3| | 3|5| +---+-+ scala> val largeValues = df.filter('value >= 10) largeValues: org.apache.spark.sql.DataFrame = [key: int, value: int] scala> largeValues.show +---+-+ |key|value| +---+-+ | 1| 10| | 3| 20| | 4| 10| +---+-+ //Joining the children scala> smallValues .join(largeValues, smallValues("key") === largeValues("key")) .withColumn("diff", smallValues("value") - largeValues("value")) .show 15/10/20 16:59:59 WARN Column: Constructing trivially true equals predicate, 'key#41 = key#41'. Perhaps you need to use aliases. +---+-+---+-++ |key|value|key|value|diff| +---+-+---+-++ | 1|1| 1| 10| 0| | 3|5| 3| 20| 0| +---+-+---+-++ This last command issued a warning, but still executed the join correctly (rows with key 2 and 4 don't appear in result set). However, the "diff" column is incorrect. Is this a bug or am I missing something here? Thanks a lot for any input, Isabelle
Re: DataFrame creation delay?
Hi Michael, Thanks a lot for your reply. This table is stored as text file with tab delimited columns. You are correct, the problem is because my table has too many partitions (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984 <https://issues.apache.org/jira/browse/SPARK-6984>. Not sure when my company can move to 1.5. Would you know some workaround for this bug? If I cannot find workaround for this, will have to change our schema design to reduce number of partitions. Thanks, Isabelle On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <mich...@databricks.com> wrote: > Also, do you mean two partitions or two partition columns? If there are > many partitions it can be much slower. In Spark 1.5 I'd consider setting > spark.sql.hive.metastorePartitionPruning=true > if you have predicates over the partition columns. > > On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> What format is this table. For parquet and other optimized formats we >> cache a bunch of file metadata on first access to make interactive queries >> faster. >> >> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com> wrote: >> >>> Hello, >>> >>> I am using SparkSQL to query some Hive tables. Most of the time, when I >>> create a DataFrame using sqlContext.sql("select * from table") command, >>> DataFrame creation is less than 0.5 second. >>> But I have this one table with which it takes almost 12 seconds! >>> >>> scala> val start = scala.compat.Platform.currentTime; val logs = >>> sqlContext.sql("select * from temp.log"); val execution = >>> scala.compat.Platform.currentTime - start >>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from >>> temp.log >>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed >>> start: Long = 1441336022731 >>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int, >>> log_time: string, tag: string, dt: string, test_id: int] >>> execution: Long = *11567* >>> >>> This table has 3.6 B rows, and 2 partitions (on dt and test_id columns). >>> I have created DataFrames on even larger tables and do not see such >>> delay. >>> So my questions are: >>> - What can impact DataFrame creation time? >>> - Is it related to the table partitions? >>> >>> >>> Thanks much your help! >>> >>> Isabelle >>> >> >> >
DataFrame creation delay?
Hello, I am using SparkSQL to query some Hive tables. Most of the time, when I create a DataFrame using sqlContext.sql("select * from table") command, DataFrame creation is less than 0.5 second. But I have this one table with which it takes almost 12 seconds! scala> val start = scala.compat.Platform.currentTime; val logs = sqlContext.sql("select * from temp.log"); val execution = scala.compat.Platform.currentTime - start 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from temp.log 15/09/04 12:07:02 INFO ParseDriver: Parse Completed start: Long = 1441336022731 logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int, log_time: string, tag: string, dt: string, test_id: int] execution: Long = *11567* This table has 3.6 B rows, and 2 partitions (on dt and test_id columns). I have created DataFrames on even larger tables and do not see such delay. So my questions are: - What can impact DataFrame creation time? - Is it related to the table partitions? Thanks much your help! Isabelle
Re: How to determine the value for spark.sql.shuffle.partitions?
+1 I had the exact same question as I am working on my first Spark applications. Hope someone can share some best practices. Thanks! Isabelle On Tue, Sep 1, 2015 at 2:17 AM, Romi Kuntsmanwrote: > Hi all, > > The number of partition greatly affect the speed and efficiency of > calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0. > > Too few partitions with large data cause OOM exceptions. > Too many partitions on small data cause a delay due to overhead. > > How do you programmatically determine the optimal number of partitions and > cores in Spark, as a function of: > >1. available memory per core >2. number of records in input data >3. average/maximum record size >4. cache configuration >5. shuffle configuration >6. serialization >7. etc? > > Any general best practices? > > Thanks! > > Romi K. >
DataFrame rollup with alias?
Hello, I am new to Spark and just running some tests to get familiar with the APIs. When calling the rollup function on my DataFrame, I get different results when I alias the columns I am grouping on (see below for example data set). I was expecting alias function to only affect the column name. Why is it also affecting the rollup results? (I know I can rename my columns after the rollup call, using withColumnRenamed function, my question is just to get better understanding of alias function.) scala df.show ++--+-+ |Name| Game|Score| ++--+-+ | Bob|Game 1| 20| | Bob|Game 2| 30| | Lea|Game 1| 25| | Lea|Game 2| 30| | Ben|Game 1|5| | Ben|Game 3| 35| | Bob|Game 3| 15| ++--+-+ //rollup results as expected scala df.rollup(df(Name), df(Game)).sum().orderBy(Name, Game).show ++--+--+ |Name| Game|SUM(Score)| ++--+--+ |null| null| 160| | Ben| null|40| | Ben|Game 1| 5| | Ben|Game 3|35| | Bob| null|65| | Bob|Game 1|20| | Bob|Game 2|30| | Bob|Game 3|15| | Lea| null|55| | Lea|Game 1|25| | Lea|Game 2|30| ++--+--+ //rollup with aliases return strange results scala df.rollup(df(Name) as Player, df(Game) as Round).sum().orderBy(Player, Round).show +--+--+--+ |Player| Round|SUM(Score)| +--+--+--+ | Ben|Game 1| 5| | Ben|Game 1| 5| | Ben|Game 1| 5| | Ben|Game 3|35| | Ben|Game 3|35| | Ben|Game 3|35| | Bob|Game 1|20| | Bob|Game 1|20| | Bob|Game 1|20| | Bob|Game 2|30| | Bob|Game 2|30| | Bob|Game 2|30| | Bob|Game 3|15| | Bob|Game 3|15| | Bob|Game 3|15| | Lea|Game 1|25| | Lea|Game 1|25| | Lea|Game 1|25| | Lea|Game 2|30| | Lea|Game 2|30| +--+--+--+ Thanks in advance for your help, Isabelle