Delete checkpointed data for a single dataset?

2019-10-23 Thread Isabelle Phan
Hello,

In a non streaming application, I am using the checkpoint feature to
truncate the lineage of complex datasets. At the end of the job, the
checkpointed data, which is stored in HDFS, is deleted.
I am looking for a way to delete the unused checkpointed data earlier than
the end of the job. If I know that one dataset won't be used anymore, is
there a way to delete its checkpointed data in the middle of the
application?

Thank you,

Isabelle


Read ORC file with subset of schema

2019-08-30 Thread Isabelle Phan
Hello,

When reading an older ORC file where the schema is a subset of the current
schema, reader throws an error. Please see sample code below (ran on spark
2.1).
The same commands on a parquet file do not error out, they return the new
column with null values.
Is there a setting to add to the reader for ORC file to achieve the same
behavior?

Thanks,

Isabelle


scala> case class Schema1(id: String)
defined class Schema1

scala> case class Schema2(id: String, new_column: String)
defined class Schema2

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala>
spark.read.schema(Encoders.product[Schema1].schema).format("orc").load("path_to_orc_file").show
+--+
|id|
+--+
|   9930481|
| 119474983|
|   4665129|
|  27531864|
|  41562099|
| 110502758|
+--+

scala>
spark.read.schema(Encoders.product[Schema2].schema).format("orc").load("path_to_orc_file").show
19/08/31 03:26:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
ip-10-x-y-z.ap-northeast-2.compute.internal, executor 1):
java.lang.IllegalArgumentException: Field "new_column" does not exist.
at
org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:290)
at
org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:290)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:289)
at
org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$6.apply(OrcFileFormat.scala:308)
at
org.apache.spark.sql.hive.orc.OrcRelation$$anonfun$6.apply(OrcFileFormat.scala:308)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at org.apache.spark.sql.types.StructType.map(StructType.scala:96)
at
org.apache.spark.sql.hive.orc.OrcRelation$.setRequiredColumns(OrcFileFormat.scala:308)
at
org.apache.spark.sql.hive.orc.OrcFileFormat$$anonfun$buildReader$2.apply(OrcFileFormat.scala:140)
at
org.apache.spark.sql.hive.orc.OrcFileFormat$$anonfun$buildReader$2.apply(OrcFileFormat.scala:129)
at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138)
at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

//Instead of error, expected following
+--+--+
|id|new_column|
+--+--+
|   9930481|  null|
| 119474983|  null|
|   4665129|  null|
|   4665129|  null|
|  27531864|  null|
|  41562099|  null|
|  41562099|  null|
| 110502758|  null|
+--+--+


ClassCastException when using SparkSQL Window function

2016-11-17 Thread Isabelle Phan
Hello,

I have a simple session table, which tracks pages users visited with a
sessionId. I would like to apply a window function by sessionId, but am
hitting a type cast exception. I am using Spark 1.5.0.

Here is sample code:
scala> df.printSchema
root
 |-- sessionid: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- page: string (nullable = true)

scala> df.withColumn("num",
rowNumber.over(Window.partitionBy("sessionid"))).show(10)

Here is the error stacktrace:
Caused by: java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Has anyone encountered this problem before? Any pointers would be greatly
appreciated.


Thanks!

Isabelle


Re: DataFrame creation delay?

2015-12-11 Thread Isabelle Phan
Hi Harsh,

Thanks a lot for your reply.

I added a predicate to my query to select a single partition in the table,
and tested with both "spark.sql.hive.metastorePartitionPruning" setting on
and off, and there is no difference in DataFrame creation time.

Yes, Michael's proposed workaround works. But I was under the impression
that this workaround was only for Spark version < 1.5. With the Hive
metastore partition pruning feature from Spark 1.5, I thought there would
be no more delay, so I could create DataFrames left and right.

I noticed that regardless of the setting, when I create a DataFrame with or
without a predicate, I get a log message from HadoopFsRelation class which
lists hdfs filepaths to ALL partitions in my table (see logInfo call in the
code
<https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L770>).
Is this expected? I am not sure who is creating this Array of filepaths,
but I am guessing this is the source of my delay.


Thanks,

Isabelle



On Thu, Dec 10, 2015 at 7:38 PM, Harsh J <ha...@cloudera.com> wrote:

> The option of "spark.sql.hive.metastorePartitionPruning=true" will not
> work unless you have a partition column predicate in your query. Your query
> of "select * from temp.log" does not do this. The slowdown appears to be
> due to the need of loading all partition metadata.
>
> Have you also tried to see if Michael's temp-table suggestion helps you
> cache the expensive partition lookup? (re-quoted below)
>
> """
> If you run sqlContext.table("...").registerTempTable("...") that temptable
> will cache the lookup of partitions [the first time is slow, but subsequent
> lookups will be faster].
> """ - X-Ref: Permalink
> <https://issues.apache.org/jira/browse/SPARK-6910?focusedCommentId=14529666=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14529666>
>
> Also, do you absolutely need to use "select * from temp.log"? Adding a
> where clause to the query with a partition condition will help Spark prune
> the request to just the required partitions (vs. all, which is proving
> expensive).
>
> On Fri, Dec 11, 2015 at 3:59 AM Isabelle Phan <nlip...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are
>> on cloudera), and Parquet formatted tables. I turned on  spark
>> .sql.hive.metastorePartitionPruning=true, but DataFrame creation still
>> takes a long time.
>> Is there any other configuration to consider?
>>
>>
>> Thanks a lot for your help,
>>
>> Isabelle
>>
>> On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> If you run sqlContext.table("...").registerTempTable("...") that
>>> temptable will cache the lookup of partitions.
>>>
>>> On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> Thanks a lot for your reply.
>>>>
>>>> This table is stored as text file with tab delimited columns.
>>>>
>>>> You are correct, the problem is because my table has too many
>>>> partitions (1825 in total). Since I am on Spark 1.4, I think I am hitting
>>>> bug 6984 <https://issues.apache.org/jira/browse/SPARK-6984>.
>>>>
>>>> Not sure when my company can move to 1.5. Would you know some
>>>> workaround for this bug?
>>>> If I cannot find workaround for this, will have to change our schema
>>>> design to reduce number of partitions.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Isabelle
>>>>
>>>>
>>>>
>>>> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> Also, do you mean two partitions or two partition columns?  If there
>>>>> are many partitions it can be much slower.  In Spark 1.5 I'd consider
>>>>> setting spark.sql.hive.metastorePartitionPruning=true if you have
>>>>> predicates over the partition columns.
>>>>>
>>>>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> What format is this table.  For parquet and other optimized formats
>>>>>> we cache a bunch of file metadata on first access to make interactive
>>>>>> queries faster.
&g

Re: DataFrame creation delay?

2015-12-10 Thread Isabelle Phan
Hi Michael,

We have just upgraded to Spark 1.5.0 (actually 1.5.0_cdh-5.5 since we are
on cloudera), and Parquet formatted tables. I turned on  spark
.sql.hive.metastorePartitionPruning=true, but DataFrame creation still
takes a long time.
Is there any other configuration to consider?


Thanks a lot for your help,

Isabelle

On Fri, Sep 4, 2015 at 1:42 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> If you run sqlContext.table("...").registerTempTable("...") that
> temptable will cache the lookup of partitions.
>
> On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> Thanks a lot for your reply.
>>
>> This table is stored as text file with tab delimited columns.
>>
>> You are correct, the problem is because my table has too many partitions
>> (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984
>> <https://issues.apache.org/jira/browse/SPARK-6984>.
>>
>> Not sure when my company can move to 1.5. Would you know some workaround
>> for this bug?
>> If I cannot find workaround for this, will have to change our schema
>> design to reduce number of partitions.
>>
>>
>> Thanks,
>>
>> Isabelle
>>
>>
>>
>> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> Also, do you mean two partitions or two partition columns?  If there are
>>> many partitions it can be much slower.  In Spark 1.5 I'd consider setting 
>>> spark.sql.hive.metastorePartitionPruning=true
>>> if you have predicates over the partition columns.
>>>
>>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> What format is this table.  For parquet and other optimized formats we
>>>> cache a bunch of file metadata on first access to make interactive queries
>>>> faster.
>>>>
>>>> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using SparkSQL to query some Hive tables. Most of the time, when
>>>>> I create a DataFrame using sqlContext.sql("select * from table") command,
>>>>> DataFrame creation is less than 0.5 second.
>>>>> But I have this one table with which it takes almost 12 seconds!
>>>>>
>>>>> scala>  val start = scala.compat.Platform.currentTime; val logs =
>>>>> sqlContext.sql("select * from temp.log"); val execution =
>>>>> scala.compat.Platform.currentTime - start
>>>>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
>>>>> temp.log
>>>>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
>>>>> start: Long = 1441336022731
>>>>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
>>>>> log_time: string, tag: string, dt: string, test_id: int]
>>>>> execution: Long = *11567*
>>>>>
>>>>> This table has 3.6 B rows, and 2 partitions (on dt and test_id
>>>>> columns).
>>>>> I have created DataFrames on even larger tables and do not see such
>>>>> delay.
>>>>> So my questions are:
>>>>> - What can impact DataFrame creation time?
>>>>> - Is it related to the table partitions?
>>>>>
>>>>>
>>>>> Thanks much your help!
>>>>>
>>>>> Isabelle
>>>>>
>>>>
>>>>
>>>
>>
>


Re: SparkSQL API to insert DataFrame into a static partition?

2015-12-04 Thread Isabelle Phan
Thanks all for your reply!

I tested both approaches: registering the temp table then executing SQL vs.
saving to HDFS filepath directly. The problem with the second approach is
that I am inserting data into a Hive table, so if I create a new partition
with this method, Hive metadata is not updated.

So I will be going with first approach.
Follow up question in this case: what is the cost of registering a temp
table? Is there a limit to the number of temp tables that can be registered
by Spark context?


Thanks again for your input.

Isabelle



On Wed, Dec 2, 2015 at 10:30 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> you might also coalesce to 1 (or some small number) before writing to
> avoid creating a lot of files in that partition if you know that there is
> not a ton of data.
>
> On Wed, Dec 2, 2015 at 12:59 AM, Rishi Mishra <rmis...@snappydata.io>
> wrote:
>
>> As long as all your data is being inserted by Spark , hence using the
>> same hash partitioner,  what Fengdong mentioned should work.
>>
>> On Wed, Dec 2, 2015 at 9:32 AM, Fengdong Yu <fengdo...@everstring.com>
>> wrote:
>>
>>> Hi
>>> you can try:
>>>
>>> if your table under location “/test/table/“ on HDFS
>>> and has partitions:
>>>
>>>  “/test/table/dt=2012”
>>>  “/test/table/dt=2013”
>>>
>>> df.write.mode(SaveMode.Append).partitionBy("date”).save(“/test/table")
>>>
>>>
>>>
>>> On Dec 2, 2015, at 10:50 AM, Isabelle Phan <nlip...@gmail.com> wrote:
>>>
>>> df.write.partitionBy("date").insertInto("my_table")
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


How to catch error during Spark job?

2015-10-27 Thread Isabelle Phan
Hello,

I had a question about error handling in Spark job: if an exception occurs
during the job, what is the best way to get notification of the failure?
Can Spark jobs return with different exit codes?

For example, I wrote a dummy Spark job just throwing out an Exception, as
follows:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object ExampleJob {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Test Job")
val sc = new SparkContext(conf)
try {
  val count = sc.parallelize(1 to 100).count
  println(s"Count: $count")

  *throw new Exception("Fail!")*

} finally {
  sc.stop
}
  }

}

The spark-submit execution trace shows the error:
spark-submit --class com.test.ExampleJob test.jar
15/10/03 03:13:16 INFO SparkContext: Running Spark version 1.4.0
15/10/03 03:13:19 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/10/03 03:13:19 WARN SparkConf:
...
15/10/03 03:13:59 INFO DAGScheduler: Job 0 finished: count at
ExampleJob.scala:12, took 18.879104 s
Count: 100
15/10/03 03:13:59 INFO SparkUI: Stopped Spark web UI at []
15/10/03 03:13:59 INFO DAGScheduler: Stopping DAGScheduler
15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Shutting down all
executors
15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down
15/10/03 03:13:59 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
15/10/03 03:13:59 INFO Utils: path =
/data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596/blockmgr-d8e40805-3b8c-45f4-97b3-b89874158796,
already present as root for deletion.
15/10/03 03:13:59 INFO MemoryStore: MemoryStore cleared
15/10/03 03:13:59 INFO BlockManager: BlockManager stopped
15/10/03 03:13:59 INFO BlockManagerMaster: BlockManagerMaster stopped
15/10/03 03:13:59 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
15/10/03 03:13:59 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.Exception: Fail!
at com.test.ExampleJob$.main(ExampleJob.scala:14)
at com.test.ExampleJob.main(ExampleJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
15/10/03 03:13:59 INFO Utils: Shutdown hook called
15/10/03 03:13:59 INFO Utils: Deleting directory
/data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596
15/10/03 03:14:00 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.


However, the Spark UI just shows the status as "FINISHED". Is this a
configuration error on my side?
[image: Inline image 1]


Thanks,

Isabelle


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Isabelle Phan
Thanks Michael and Ali for the reply!

I'll make sure to use unresolved columns when working with self joins then.

As pointed by Ali, isn't there still an issue with the aliasing? It works
when using org.apache.spark.sql.functions.col(colName: String) method, but
not when using org.apache.spark.sql.DataFrame.apply(colName: String):

scala> j.select(col("lv.value")).show
+-+
|value|
+-+
|   10|
|   20|
+-+


scala> j.select(largeValues("lv.value")).show
+-+
|value|
+-+
|1|
|5|
+-+

Or does this behavior have the same root cause as detailed in Michael's
email?


-Isabelle




On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust 
wrote:

> Unfortunately, the mechanisms that we use to differentiate columns
> automatically don't work particularly well in the presence of self joins.
> However, you can get it work if you use the $"column" syntax consistently:
>
> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", 
> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = 
> df.filter('value >= 10).as("lv")
> ​
> smallValues
>   .join(largeValues, $"sv.key" === $"lv.key")
>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
> $"lv.value".as("large_value"))
>   .withColumn("diff", $"small_value" - $"large_value")
>   .show()
> +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
>   1|  1| 10|  -9||  3|  5| 20| 
> -15|+---+---+---++
>
>
> The problem with the other cases is that calling smallValues("columnName")
> or largeValues("columnName") is eagerly resolving the attribute to the
> same column (since the data is actually coming from the same place).  By
> the time we realize that you are joining the data with itself (at which
> point we rewrite one side of the join to use different expression ids) its
> too late.  At the core the problem is that in Scala we have no easy way to
> differentiate largeValues("columnName") from smallValues("columnName").
> This is because the data is coming from the same DataFrame and we don't
> actually know which variable name you are using.  There are things we can
> change here, but its pretty hard to change the semantics without breaking
> other use cases.
>
> So, this isn't a straight forward "bug", but its definitely a usability
> issue.  For now, my advice would be: only use unresolved columns (i.e.
> $"[alias.]column" or col("[alias.]column")) when working with self joins.
>
> Michael
>


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Isabelle Phan
Ok, got it.
Thanks a lot Michael for the detailed reply!
On Oct 21, 2015 1:54 PM, "Michael Armbrust" <mich...@databricks.com> wrote:

> Yeah, I was suggesting that you avoid using  
> org.apache.spark.sql.DataFrame.apply(colName:
> String) when you are working with selfjoins as it eagerly binds to a
> specific column in a what that breaks when we do the rewrite of one side of
> the query.  Using the apply method constructs a resolved column eagerly
> (which looses the alias information).
>
> On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>
>> Thanks Michael and Ali for the reply!
>>
>> I'll make sure to use unresolved columns when working with self joins
>> then.
>>
>> As pointed by Ali, isn't there still an issue with the aliasing? It works
>> when using org.apache.spark.sql.functions.col(colName: String) method, but
>> not when using org.apache.spark.sql.DataFrame.apply(colName: String):
>>
>> scala> j.select(col("lv.value")).show
>> +-+
>> |value|
>> +-+
>> |   10|
>> |   20|
>> +-+
>>
>>
>> scala> j.select(largeValues("lv.value")).show
>> +-+
>> |value|
>> +-+
>> |1|
>> |5|
>> +-+
>>
>> Or does this behavior have the same root cause as detailed in Michael's
>> email?
>>
>>
>> -Isabelle
>>
>>
>>
>>
>> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Unfortunately, the mechanisms that we use to differentiate columns
>>> automatically don't work particularly well in the presence of self joins.
>>> However, you can get it work if you use the $"column" syntax
>>> consistently:
>>>
>>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", 
>>> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = 
>>> df.filter('value >= 10).as("lv")
>>> ​
>>> smallValues
>>>   .join(largeValues, $"sv.key" === $"lv.key")
>>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
>>> $"lv.value".as("large_value"))
>>>   .withColumn("diff", $"small_value" - $"large_value")
>>>   .show()
>>> +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
>>>   1|  1| 10|  -9||  3|  5| 20| 
>>> -15|+---+---+---++
>>>
>>>
>>> The problem with the other cases is that calling
>>> smallValues("columnName") or largeValues("columnName") is eagerly
>>> resolving the attribute to the same column (since the data is actually
>>> coming from the same place).  By the time we realize that you are joining
>>> the data with itself (at which point we rewrite one side of the join to use
>>> different expression ids) its too late.  At the core the problem is that in
>>> Scala we have no easy way to differentiate largeValues("columnName")
>>> from smallValues("columnName").  This is because the data is coming
>>> from the same DataFrame and we don't actually know which variable name you
>>> are using.  There are things we can change here, but its pretty hard to
>>> change the semantics without breaking other use cases.
>>>
>>> So, this isn't a straight forward "bug", but its definitely a usability
>>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>>> $"[alias.]column" or col("[alias.]column")) when working with self
>>> joins.
>>>
>>> Michael
>>>
>>
>>
>


How to distinguish columns when joining DataFrames with shared parent?

2015-10-20 Thread Isabelle Phan
Hello,

When joining 2 DataFrames which originate from the same initial DataFrame,
why can't org.apache.spark.sql.DataFrame.apply(colName: String) method
distinguish which column to read?

Let me illustrate this question with a simple example (ran on Spark 1.5.1):

//my initial DataFrame
scala> df
res39: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> df.show
+---+-+
|key|value|
+---+-+
|  1|1|
|  1|   10|
|  2|3|
|  3|   20|
|  3|5|
|  4|   10|
+---+-+


//2 children DataFrames
scala> val smallValues = df.filter('value < 10)
smallValues: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> smallValues.show
+---+-+
|key|value|
+---+-+
|  1|1|
|  2|3|
|  3|5|
+---+-+


scala> val largeValues = df.filter('value >= 10)
largeValues: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> largeValues.show
+---+-+
|key|value|
+---+-+
|  1|   10|
|  3|   20|
|  4|   10|
+---+-+


//Joining the children
scala> smallValues
  .join(largeValues, smallValues("key") === largeValues("key"))
  .withColumn("diff", smallValues("value") - largeValues("value"))
  .show
15/10/20 16:59:59 WARN Column: Constructing trivially true equals
predicate, 'key#41 = key#41'. Perhaps you need to use aliases.
+---+-+---+-++
|key|value|key|value|diff|
+---+-+---+-++
|  1|1|  1|   10|   0|
|  3|5|  3|   20|   0|
+---+-+---+-++


This last command issued a warning, but still executed the join correctly
(rows with key 2 and 4 don't appear in result set). However, the "diff"
column is incorrect.

Is this a bug or am I missing something here?


Thanks a lot for any input,

Isabelle


Re: DataFrame creation delay?

2015-09-04 Thread Isabelle Phan
Hi Michael,

Thanks a lot for your reply.

This table is stored as text file with tab delimited columns.

You are correct, the problem is because my table has too many partitions
(1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984
<https://issues.apache.org/jira/browse/SPARK-6984>.

Not sure when my company can move to 1.5. Would you know some workaround
for this bug?
If I cannot find workaround for this, will have to change our schema design
to reduce number of partitions.


Thanks,

Isabelle



On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Also, do you mean two partitions or two partition columns?  If there are
> many partitions it can be much slower.  In Spark 1.5 I'd consider setting 
> spark.sql.hive.metastorePartitionPruning=true
> if you have predicates over the partition columns.
>
> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> What format is this table.  For parquet and other optimized formats we
>> cache a bunch of file metadata on first access to make interactive queries
>> faster.
>>
>> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using SparkSQL to query some Hive tables. Most of the time, when I
>>> create a DataFrame using sqlContext.sql("select * from table") command,
>>> DataFrame creation is less than 0.5 second.
>>> But I have this one table with which it takes almost 12 seconds!
>>>
>>> scala>  val start = scala.compat.Platform.currentTime; val logs =
>>> sqlContext.sql("select * from temp.log"); val execution =
>>> scala.compat.Platform.currentTime - start
>>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
>>> temp.log
>>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
>>> start: Long = 1441336022731
>>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
>>> log_time: string, tag: string, dt: string, test_id: int]
>>> execution: Long = *11567*
>>>
>>> This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
>>> I have created DataFrames on even larger tables and do not see such
>>> delay.
>>> So my questions are:
>>> - What can impact DataFrame creation time?
>>> - Is it related to the table partitions?
>>>
>>>
>>> Thanks much your help!
>>>
>>> Isabelle
>>>
>>
>>
>


DataFrame creation delay?

2015-09-03 Thread Isabelle Phan
Hello,

I am using SparkSQL to query some Hive tables. Most of the time, when I
create a DataFrame using sqlContext.sql("select * from table") command,
DataFrame creation is less than 0.5 second.
But I have this one table with which it takes almost 12 seconds!

scala>  val start = scala.compat.Platform.currentTime; val logs =
sqlContext.sql("select * from temp.log"); val execution =
scala.compat.Platform.currentTime - start
15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from temp.log
15/09/04 12:07:02 INFO ParseDriver: Parse Completed
start: Long = 1441336022731
logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
log_time: string, tag: string, dt: string, test_id: int]
execution: Long = *11567*

This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
I have created DataFrames on even larger tables and do not see such delay.
So my questions are:
- What can impact DataFrame creation time?
- Is it related to the table partitions?


Thanks much your help!

Isabelle


Re: How to determine the value for spark.sql.shuffle.partitions?

2015-09-03 Thread Isabelle Phan
+1

I had the exact same question as I am working on my first Spark
applications.
Hope someone can share some best practices.

Thanks!

Isabelle

On Tue, Sep 1, 2015 at 2:17 AM, Romi Kuntsman  wrote:

> Hi all,
>
> The number of partition greatly affect the speed and efficiency of
> calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0.
>
> Too few partitions with large data cause OOM exceptions.
> Too many partitions on small data cause a delay due to overhead.
>
> How do you programmatically determine the optimal number of partitions and
> cores in Spark, as a function of:
>
>1. available memory per core
>2. number of records in input data
>3. average/maximum record size
>4. cache configuration
>5. shuffle configuration
>6. serialization
>7. etc?
>
> Any general best practices?
>
> Thanks!
>
> Romi K.
>


DataFrame rollup with alias?

2015-08-23 Thread Isabelle Phan
Hello,

I am new to Spark and just running some tests to get familiar with the APIs.

When calling the rollup function on my DataFrame, I get different results
when I alias the columns I am grouping on (see below for example data set).
I was expecting alias function to only affect the column name. Why is it
also affecting the rollup results?
(I know I can rename my columns after the rollup call, using
withColumnRenamed function, my question is just to get better understanding
of alias function.)

scala df.show
++--+-+
|Name|  Game|Score|
++--+-+
| Bob|Game 1|   20|
| Bob|Game 2|   30|
| Lea|Game 1|   25|
| Lea|Game 2|   30|
| Ben|Game 1|5|
| Ben|Game 3|   35|
| Bob|Game 3|   15|
++--+-+

//rollup results as expected
scala df.rollup(df(Name), df(Game)).sum().orderBy(Name, Game).show
++--+--+
|Name|  Game|SUM(Score)|
++--+--+
|null|  null|   160|
| Ben|  null|40|
| Ben|Game 1| 5|
| Ben|Game 3|35|
| Bob|  null|65|
| Bob|Game 1|20|
| Bob|Game 2|30|
| Bob|Game 3|15|
| Lea|  null|55|
| Lea|Game 1|25|
| Lea|Game 2|30|
++--+--+

//rollup with aliases return strange results
scala df.rollup(df(Name) as Player, df(Game) as
Round).sum().orderBy(Player, Round).show
+--+--+--+
|Player| Round|SUM(Score)|
+--+--+--+
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 2|30|
|   Lea|Game 2|30|
+--+--+--+


Thanks in advance for your help,

Isabelle