Re: String operation in filter with a special character

2015-10-05 Thread Michael Armbrust
Double quotes (") are used to create string literals in HiveQL / Spark
SQL.  So you are asking if the string A+B equals the number 2.0.

You should use backticks (`) to escape weird characters in column names.

On Mon, Oct 5, 2015 at 12:59 AM, Hemminger Jeff  wrote:

> I have a rather odd use case. I have a DataFrame column name with a +
> value in it.
> The app performs some processing steps before determining the column name,
> and it
> would be much easier to code if I could use the DataFrame filter
> operations with a String.
>
> This demonstrates the issue I am having:
>
> dataFrame.filter(renamed("A+B").equalTo(2.0)).show()
>
> This will return all rows with the column value matching 2.0, as expected.
>
> dataFrame.filter("\"A+B\"=2.0").show()
>
> This executes but does not return the correct results. It returns an empty
> result.
>
> dataFrame.filter("\"A+C\"=2.0").show()
>
> Referencing a non-existent column name returns the same empty result.
>
> Any suggestions?
>
> Jeff
>


Re: Spark context on thrift server

2015-10-05 Thread Michael Armbrust
Isolation for different sessions will hopefully be fixed by
https://github.com/apache/spark/pull/8909

On Mon, Oct 5, 2015 at 8:38 AM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi,
>
>
>
> We’re using a spark thrift server and we connect using jdbc to run queries.
>
> Every time we run a set query, like “set schema”, it seems to affect the
> server, and not the session only.
>
>
>
> Is that an expected behavior? Or am I missing something.
>
>
>
>
>
> *Younes Naguib*
>
> Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 1R8
>
> Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | younes.naguib
> @tritondigital.com 
>
>
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-10-05 Thread Michael Armbrust
Looks correct to me.  Try for example:

from pyspark.sql.functions import *
df.withColumn("value", explode(df['values'])).show()

On Mon, Oct 5, 2015 at 2:15 PM, Fernando Paladini <fnpalad...@gmail.com>
wrote:

> Update:
>
> I've updated my code and now I have the following JSON:
> https://gist.github.com/paladini/27bb5636d91dec79bd56
> In the same link you can check the output from "spark-submit
> myPythonScript.py", where I call "myDataframe.show()". The following is
> printed by Spark (among other useless debug information):
>
>
> ​
> That's correct for the given JSON input
> <https://gist.github.com/paladini/27bb5636d91dec79bd56> (gist link
> above)? How can I test if Spark can understand this DataFrame and make
> complex manipulations with that?
>
> Thank you! Hope you can help me soon :3
> Fernando Paladini.
>
> 2015-10-05 15:23 GMT-03:00 Fernando Paladini <fnpalad...@gmail.com>:
>
>> Thank you for the replies and sorry about the delay, my e-mail client
>> send this conversation to Spam (??).
>>
>> I'll take a look in your tips and come back later to post my questions /
>> progress. Again, thank you so much!
>>
>> 2015-09-30 18:37 GMT-03:00 Michael Armbrust <mich...@databricks.com>:
>>
>>> I think the problem here is that you are passing in parsed JSON that
>>> stored as a dictionary (which is converted to a hashmap when going into the
>>> JVM).  You should instead be passing in the path to the json file
>>> (formatted as Akhil suggests) so that Spark can do the parsing in
>>> parallel.  The other option would be to construct and RDD of JSON string
>>> and pass that to the JSON method.
>>>
>>> On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Each Json Doc should be in a single line i guess.
>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>>>>
>>>> Note that the file that is offered as *a json file* is not a typical
>>>> JSON file. Each line must contain a separate, self-contained valid JSON
>>>> object. As a consequence, a regular multi-line JSON file will most often
>>>> fail.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini <
>>>> fnpalad...@gmail.com> wrote:
>>>>
>>>>> Hello guys,
>>>>>
>>>>> I'm very new to Spark and I'm having some troubles when reading a JSON
>>>>> to dataframe on PySpark.
>>>>>
>>>>> I'm getting a JSON object from an API response and I would like to
>>>>> store it in Spark as a DataFrame (I've read that DataFrame is better than
>>>>> RDD, that's accurate?). For what I've read
>>>>> <http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext>
>>>>> on documentation, I just need to call the method sqlContext.read.json in
>>>>> order to do what I want.
>>>>>
>>>>> *Following is the code from my test application:*
>>>>> json_object = json.loads(response.text)
>>>>> sc = SparkContext("local", appName="JSON to RDD")
>>>>> sqlContext = SQLContext(sc)
>>>>> dataframe = sqlContext.read.json(json_object)
>>>>> dataframe.show()
>>>>>
>>>>> *The problem is that when I run **"spark-submit myExample.py" I got
>>>>> the following error:*
>>>>> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
>>>>> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
>>>>> localhost, 48634)
>>>>> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
>>>>> Traceback (most recent call last):
>>>>>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
>>>>> line 35, in 
>>>>> dataframe = sqlContext.read.json(json_object)
>>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>>>>> line 144, in json
>>>>>   File
>>>>> "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line
>>>>> 538, in __call__
>>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>>>>> 36, in deco
>>>>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>>>>> line 304, in get_return_value
>>>>> py4j.protocol.Py4JError: An error occurred while calling o21.json.
>>>>> Trace:
>>>>> py4j.Py4JException: Method json([class java.util.HashMap]) does not
>>>>> exist
>>>>> at
>>>>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>>>> at
>>>>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>>>> at py4j.Gateway.invoke(Gateway.java:252)
>>>>> at
>>>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> *What I'm doing wrong? *
>>>>> Check out this gist
>>>>> <https://gist.github.com/paladini/2e2ea913d545a407b842> to see the
>>>>> JSON I'm trying to load.
>>>>>
>>>>> Thanks!
>>>>> Fernando Paladini
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Fernando Paladini
>>
>
>
>
> --
> Fernando Paladini
>


Re: Spark SQL "SELECT ... LIMIT" scans the entire Hive table?

2015-10-05 Thread Michael Armbrust
It does do a take.  Run explain to make sure that is the case.  Why do you
think its reading the whole table?

On Mon, Oct 5, 2015 at 1:53 PM, YaoPau  wrote:

> I'm using SqlCtx connected to Hive in CDH 5.4.4.  When I run "SELECT * FROM
> my_db.my_tbl LIMIT 5", it scans the entire table like Hive would instead of
> doing a .take(5) on it and returning results immediately.
>
> Is there a way to get Spark SQL to use .take(5) instead of the Hive logic
> of
> scanning the full table when running a SELECT?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-SELECT-LIMIT-scans-the-entire-Hive-table-tp24938.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: performance difference between Thrift server and SparkSQL?

2015-10-03 Thread Michael Armbrust
Underneath the covers, the thrift server is just calling

hiveContext.sql(...) so this is surprising.  Maybe running EXPLAIN or
EXPLAIN EXTENDED in both modes would be helpful in debugging?



On Sat, Oct 3, 2015 at 1:08 PM, Jeff Thompson <
jeffreykeatingthomp...@gmail.com> wrote:

> Hi,
>
> I'm running a simple SQL query over a ~700 million row table of the form:
>
> SELECT * FROM my_table WHERE id = '12345';
>
> When I submit the query via beeline & the JDBC thrift server it returns in
> 35s
> When I submit the exact same query using sparkSQL from a pyspark shell
> (sqlContex.sql("SELECT * FROM ")) it returns in 3s.
>
> Both times are obtained from the spark web UI.  The query only returns 43
> rows, a small amount of data.
>
> The table was created by saving a sparkSQL dataframe as a parquet file and
> then calling createExternalTable.
>
> I have tried to ensure that all relevant cluster parameters are equivalent
> across the two queries:
> spark.executor.memory = 6g
> spark.executor.instances = 100
> no explicit caching (storage tab in web UI is empty)
> spark version: 1.4.1
> Hadoop v2.5.0-cdh5.3.0, running spark on top of YARN
> jobs run on the same physical cluster (on-site harware)
>
> From the web UIs, I can see that the query plans are clearly different,
> and I think this may be the source of the performance difference.
>
> Thrift server job:
> 1 stage only, stage 1 (35s) map -> Filter -> mapPartitions
>
> SparkSQL job:
> 2 stages, stage 1 (2s): map -> filter -> Project -> Aggregate -> Exchange,
> stage 2 (0.4s): Exchange -> Aggregate -> mapPartitions
>
> Is this a know issue?  Is there anything I can do to get the Thrift server
> to use the same query optimizer as the one used by sparkSQL?  I'd love to
> pick up a ~10x performance gain for my jobs submitted via the Thrift server.
>
> Best regards,
>
> Jeff
>


Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Michael Armbrust
import org.apache.spark.sql.functions.*

callUDF("MyUDF", col("col1"), col("col2"))

On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:

> Hi I have registed my hive UDF using the following code:
>
> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
> public String call(String o) throws Execption {
> //bla bla
> }
> },DataTypes.String);
>
> Now I want to use above MyUDF in DataFrame. How do we use it? I know how to
> use it in a sql and it works fine
>
> hiveContext.sql(select MyUDF("test") from myTable);
>
> My hiveContext.sql() query involves group by on multiple columns so for
> scaling purpose I am trying to convert this query into DataFrame APIs
>
>
> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>
> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkSQL: Reading data from hdfs and storing into multiple paths

2015-10-02 Thread Michael Armbrust
Once you convert your data to a dataframe (look at spark-csv), try
df.write.partitionBy("", "mm").save("...").

On Thu, Oct 1, 2015 at 4:11 PM, haridass saisriram <
haridass.saisri...@gmail.com> wrote:

> Hi,
>
>   I am trying to find a simple example to read a data file on HDFS. The
> file has the following format
> a , b  , c ,,mm
> a1,b1,c1,2015,09
> a2,b2,c2,2014,08
>
>
> I would like to read this file and store it in HDFS partitioned by year
> and month. Something like this
> /path/to/hdfs//mm
>
> I want to specify the "/path/to/hdfs/" and /mm should be populated
> automatically based on those columns. Could some one point me in the right
> direction
>
> Thank you,
> Sri Ram
>
>


Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Michael Armbrust
callUDF("MyUDF", col("col1").as("name")

or

callUDF("MyUDF", col("col1").alias("name")

On Fri, Oct 2, 2015 at 3:29 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Michael,
>
> Thanks much. How do we give alias name for resultant columns? For e.g.
> when using
>
> hiveContext.sql("select MyUDF("test") as mytest from myTable");
>
> how do we do that in DataFrame callUDF
>
> callUDF("MyUDF", col("col1"))???
>
> On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> import org.apache.spark.sql.functions.*
>>
>> callUDF("MyUDF", col("col1"), col("col2"))
>>
>> On Fri, Oct 2, 2015 at 6:25 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Hi I have registed my hive UDF using the following code:
>>>
>>> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
>>> public String call(String o) throws Execption {
>>> //bla bla
>>> }
>>> },DataTypes.String);
>>>
>>> Now I want to use above MyUDF in DataFrame. How do we use it? I know how
>>> to
>>> use it in a sql and it works fine
>>>
>>> hiveContext.sql(select MyUDF("test") from myTable);
>>>
>>> My hiveContext.sql() query involves group by on multiple columns so for
>>> scaling purpose I am trying to convert this query into DataFrame APIs
>>>
>>>
>>> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>>>
>>> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON on PySpark

2015-09-30 Thread Michael Armbrust
I think the problem here is that you are passing in parsed JSON that stored
as a dictionary (which is converted to a hashmap when going into the JVM).
You should instead be passing in the path to the json file (formatted as
Akhil suggests) so that Spark can do the parsing in parallel.  The other
option would be to construct and RDD of JSON string and pass that to the
JSON method.

On Wed, Sep 30, 2015 at 2:28 AM, Akhil Das 
wrote:

> Each Json Doc should be in a single line i guess.
> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>
> Note that the file that is offered as *a json file* is not a typical JSON
> file. Each line must contain a separate, self-contained valid JSON object.
> As a consequence, a regular multi-line JSON file will most often fail.
>
> Thanks
> Best Regards
>
> On Tue, Sep 29, 2015 at 11:07 AM, Fernando Paladini 
> wrote:
>
>> Hello guys,
>>
>> I'm very new to Spark and I'm having some troubles when reading a JSON to
>> dataframe on PySpark.
>>
>> I'm getting a JSON object from an API response and I would like to store
>> it in Spark as a DataFrame (I've read that DataFrame is better than RDD,
>> that's accurate?). For what I've read
>> 
>> on documentation, I just need to call the method sqlContext.read.json in
>> order to do what I want.
>>
>> *Following is the code from my test application:*
>> json_object = json.loads(response.text)
>> sc = SparkContext("local", appName="JSON to RDD")
>> sqlContext = SQLContext(sc)
>> dataframe = sqlContext.read.json(json_object)
>> dataframe.show()
>>
>> *The problem is that when I run **"spark-submit myExample.py" I got the
>> following error:*
>> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
>> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
>> localhost, 48634)
>> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
>> Traceback (most recent call last):
>>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
>> line 35, in 
>> dataframe = sqlContext.read.json(json_object)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>> line 144, in json
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
>> in deco
>>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
>> py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>> at py4j.Gateway.invoke(Gateway.java:252)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> *What I'm doing wrong? *
>> Check out this gist
>>  to see the JSON
>> I'm trying to load.
>>
>> Thanks!
>> Fernando Paladini
>>
>
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Michael Armbrust
Thats a pretty advanced example that uses experimental APIs.  I'd suggest
looking at https://github.com/databricks/spark-avro as a reference.

On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu  wrote:

> See this thread:
>
> http://search-hadoop.com/m/q3RTttmiYDqGc202
>
> And:
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>
> On Sep 28, 2015, at 8:22 PM, Jerry Lam  wrote:
>
> Hi spark users and developers,
>
> I'm trying to learn how implement a custom data source for Spark SQL. Is
> there a documentation that I can use as a reference? I'm not sure exactly
> what needs to be extended/implemented. A general workflow will be greatly
> helpful!
>
> Best Regards,
>
> Jerry
>
>


Re: unintended consequence of using coalesce operation

2015-09-29 Thread Michael Armbrust
coalesce is generally to avoid launching too many tasks, on a bunch of
small files.  As a result, the goal is to reduce parallelism (when the
overhead of that parallelism is more costly than the gain).  You are
correct that in you case repartition sounds like a better choice.

On Tue, Sep 29, 2015 at 4:33 PM, Lan Jiang  wrote:

> Hi, there
>
> I ran into an issue when using Spark (v 1.3) to load avro file through
> Spark SQL. The code sample is below
>
> val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
> val myrdd = df.select(“Key", “Name", “binaryfield").rdd
> val results = myrdd.map(...)
> val finalResults = results.filter(...)
> finalResults.*coalesce(1)*.toDF().saveAsParquetFile(“path-to-parquet”)
>
> The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS
> blocks, which means there should be 5 partitions. Please note that I use
> coalesce because I expect the previous filter transformation should filter
> out almost all the data and I would like to write to 1 single parquet file.
>
> YARN cluster has 3 datanodes. I use the below configuration for spark
> submit
>
> spark-submit —class  —num-executors 3 —executor-cores 2
> —executor-memory 8g —master yarn-client mytest.jar
>
> I do see 3 executors being created, one on each data/worker node. However,
> there is only one task running within the cluster.  After I remove the
> coalesce(1) call from the codes, I can see 5 tasks generates, spreading
> across 3 executors.  I was surprised by the result. coalesce usually is
> thought to be a better choice than repartition operation when reducing the
> partition numbers. However, in the case, it causes performance issue
> because Spark only creates one task because the final partition number was
> coalesced to 1.  Thus there is only one thread reading HDFS files instead
> of 5.
>
> Is my understanding correct? In this case, I think repartition is a better
> choice than coalesce.
>
> Lan
>
>
>
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Michael Armbrust
Yep, we've designed it so that we take care of any translation that needs
to be done for you.

On Tue, Sep 29, 2015 at 10:39 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Michael and Ted,
>
> Thank you for the reference. Is it true that once I implement a custom
> data source, it can be used in all spark supported language? That is Scala,
> Java, Python and R. :)
> I want to take advantage of the interoperability that is already built in
> spark.
>
> Thanks!
>
> Jerry
>
> On Tue, Sep 29, 2015 at 11:31 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Thats a pretty advanced example that uses experimental APIs.  I'd suggest
>> looking at https://github.com/databricks/spark-avro as a reference.
>>
>> On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> See this thread:
>>>
>>> http://search-hadoop.com/m/q3RTttmiYDqGc202
>>>
>>> And:
>>>
>>>
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>>>
>>> On Sep 28, 2015, at 8:22 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>> Hi spark users and developers,
>>>
>>> I'm trying to learn how implement a custom data source for Spark SQL. Is
>>> there a documentation that I can use as a reference? I'm not sure exactly
>>> what needs to be extended/implemented. A general workflow will be greatly
>>> helpful!
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>
>


Re: Spark SQL deprecating Hive? How will I access Hive metadata in the future?

2015-09-29 Thread Michael Armbrust
We are not deprecating HiveQL, nor the ability to read metadata from the
metastore.

On Tue, Sep 29, 2015 at 12:24 PM, YaoPau  wrote:

> I've heard that Spark SQL will be or has already started deprecating HQL.
> We
> have Spark SQL + Python jobs that currently read from the Hive metastore to
> get things like table location and partition values.
>
> Will we have to re-code these functions in future releases of Spark (maybe
> by connecting to Hive directly), or will fetching Hive metastore data be
> supported in future releases via regular SQL?
>
> Jon
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-deprecating-Hive-How-will-I-access-Hive-metadata-in-the-future-tp24874.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Performance when iterating over many parquet files

2015-09-28 Thread Michael Armbrust
Another note: for best performance you are going to want your parquet files
to be pretty big (100s of mb).  You could coalesce them and write them out
for more efficient repeat querying.

On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> sqlContext.read.parquet
> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
> takes lists of files.
>
> val fileList = sc.textFile("file_list.txt").collect() // this works but
> using spark is possibly overkill
> val dataFrame = sqlContext.read.parquet(fileList: _*)
>
> On Mon, Sep 28, 2015 at 1:35 PM, jwthomas <jordan.tho...@accenture.com>
> wrote:
>
>> We are working with use cases where we need to do batch processing on a
>> large
>> number (hundreds of thousands) of Parquet files.  The processing is quite
>> similar per file.  There are a many aggregates that are very SQL-friendly
>> (computing averages, maxima, minima, aggregations on single columns with
>> some selection criteria).  There are also some processing that is more
>> advanced time-series processing (continuous wavelet transforms and the
>> like).  This all seems like a good use case for Spark.
>>
>> But I'm having performance problems.  Let's take a look at something very
>> simple, which simply checks whether the parquet files are readable.
>>
>> Code that seems natural but doesn't work:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
>> Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>>
>> My understanding is that this doesn't work because sqlContext can't be
>> used
>> inside of a transformation like "map" (or inside an action).  That it only
>> makes sense in the driver.  Thus, it becomes a null reference in the above
>> code, so all reads fail.
>>
>> Code that works:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
>> =>
>> (x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x =>
>> x._1)
>>
>>
>> This works because the collect() means that everything happens back on the
>> driver.  So the sqlContext object makes sense and everything works fine.
>>
>> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
>> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
>> minute to execute for 100 parquet files.  Which is too long.  Recall we
>> need
>> to do this across hundreds of thousands of files.
>>
>> I realize it is possible to parallelize after the read:
>>
>> import scala.util.{Try, Success, Failure} val parquetFiles =
>> sc.textFile("file_list.txt") val intermediate_successes =
>> parquetFiles.collect().map(x => (x,
>> Try(sqlContext.read.parquet(x
>> val dist_successes = sc.parallelize(successes) val successes =
>> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>>
>>
>> But this does not improve performance substantially.  It seems the
>> bottleneck is that the reads are happening sequentially.
>>
>> Is there a better way to do this?
>>
>> Thanks,
>> Jordan
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Performance when iterating over many parquet files

2015-09-28 Thread Michael Armbrust
sqlContext.read.parquet

takes lists of files.

val fileList = sc.textFile("file_list.txt").collect() // this works but
using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)

On Mon, Sep 28, 2015 at 1:35 PM, jwthomas 
wrote:

> We are working with use cases where we need to do batch processing on a
> large
> number (hundreds of thousands) of Parquet files.  The processing is quite
> similar per file.  There are a many aggregates that are very SQL-friendly
> (computing averages, maxima, minima, aggregations on single columns with
> some selection criteria).  There are also some processing that is more
> advanced time-series processing (continuous wavelet transforms and the
> like).  This all seems like a good use case for Spark.
>
> But I'm having performance problems.  Let's take a look at something very
> simple, which simply checks whether the parquet files are readable.
>
> Code that seems natural but doesn't work:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
> Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>
> My understanding is that this doesn't work because sqlContext can't be used
> inside of a transformation like "map" (or inside an action).  That it only
> makes sense in the driver.  Thus, it becomes a null reference in the above
> code, so all reads fail.
>
> Code that works:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x
> =>
> (x, Try(sqlContext.read.parquet(x.filter(_._2.isSuccess).map(x => x._1)
>
>
> This works because the collect() means that everything happens back on the
> driver.  So the sqlContext object makes sense and everything works fine.
>
> But it is slow.  I'm using yarn-client mode on a 6-node cluster with 17
> executors, 40 GB ram on driver, 19GB on executors.  And it takes about 1
> minute to execute for 100 parquet files.  Which is too long.  Recall we
> need
> to do this across hundreds of thousands of files.
>
> I realize it is possible to parallelize after the read:
>
> import scala.util.{Try, Success, Failure} val parquetFiles =
> sc.textFile("file_list.txt") val intermediate_successes =
> parquetFiles.collect().map(x => (x,
> Try(sqlContext.read.parquet(x
> val dist_successes = sc.parallelize(successes) val successes =
> dist_successes.filter(_._2.isSuccess).map(x => x._1)
>
>
> But this does not improve performance substantially.  It seems the
> bottleneck is that the reads are happening sequentially.
>
> Is there a better way to do this?
>
> Thanks,
> Jordan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-27 Thread Michael Armbrust
No, you would just have to do another select to pull out the fields you are
interested in.

On Sat, Sep 26, 2015 at 11:11 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Michael,
>
> Thanks for the tip. With dataframe, is it possible to explode some
> selected fields in each purchase_items?
> Since purchase_items is an array of item and each item has a number of
> fields (for example product_id and price), is it possible to just explode
> these two fields directly using dataframe?
>
> Best Regards,
>
>
> Jerry
>
> On Fri, Sep 25, 2015 at 7:53 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> The SQL parser without HiveContext is really simple, which is why I
>> generally recommend users use HiveContext.  However, you can do it with
>> dataframes:
>>
>> import org.apache.spark.sql.functions._
>> table("purchases").select(explode(df("purchase_items")).as("item"))
>>
>>
>>
>> On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi sparkers,
>>>
>>> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
>>> I don't want to start up a metastore and derby just because I need
>>> LATERAL VIEW EXPLODE.
>>>
>>> I have been trying but I always get the exception like this:
>>>
>>> Name: java.lang.RuntimeException
>>> Message: [1.68] failure: ``union'' expected but identifier view found
>>>
>>> with the query look like:
>>>
>>> "select items from purhcases lateral view explode(purchase_items) tbl as
>>> items"
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>
>


Re: Reading Hive Tables using SQLContext

2015-09-25 Thread Michael Armbrust
Eventually I'd like to eliminate HiveContext, but for now I just recommend
that most users use it instead of SQLContext.

On Thu, Sep 24, 2015 at 5:41 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Thanks Michael. Just want to check if there is a roadmap to include Hive
> tables from SQLContext.
>
> -Sathish
>
> On Thu, Sep 24, 2015 at 7:46 PM Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> No, you have to use a HiveContext.
>>
>> On Thu, Sep 24, 2015 at 2:47 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is it possible to access Hive tables directly from SQLContext instead of
>>> HiveContext? I am facing with errors while doing it.
>>>
>>> Please let me know
>>>
>>>
>>> Thanks
>>>
>>> Sathish
>>>
>>
>>


Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-25 Thread Michael Armbrust
The SQL parser without HiveContext is really simple, which is why I
generally recommend users use HiveContext.  However, you can do it with
dataframes:

import org.apache.spark.sql.functions._
table("purchases").select(explode(df("purchase_items")).as("item"))



On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam  wrote:

> Hi sparkers,
>
> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
> I don't want to start up a metastore and derby just because I need LATERAL
> VIEW EXPLODE.
>
> I have been trying but I always get the exception like this:
>
> Name: java.lang.RuntimeException
> Message: [1.68] failure: ``union'' expected but identifier view found
>
> with the query look like:
>
> "select items from purhcases lateral view explode(purchase_items) tbl as
> items"
>
> Best Regards,
>
> Jerry
>
>


Re: Spark for Oracle sample code

2015-09-25 Thread Michael Armbrust
In most cases predicates that you add to jdbcDF will be push down into
oracle, preventing the whole table from being sent over.

df.where("column = 1")

Another common pattern is to save the table to parquet or something for
repeat querying.

Michael

On Fri, Sep 25, 2015 at 3:13 PM, Cui Lin  wrote:

>
>
> Hello, All,
>
> I found the examples for JDBC connection are mostly read the whole table
> and then do operations like joining.
>
> val jdbcDF = sqlContext.read.format("jdbc").options(
>   Map("url" -> "jdbc:postgresql:dbserver",
>   "dbtable" -> "schema.tablename")).load()
>
>
> Sometimes it is not practical since the whole table data is too big and
> not necessary.
>
> What makes sense to me is to use sparksql to get subset data from oracle
> tables using sql-like statement.
> I couldn't find such examples. Can someone show me?
>
>
>
> --
> Best regards!
>
> Lin,Cui
>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Querying on multiple Hive stores using Apache Spark

2015-09-24 Thread Michael Armbrust
This is not supported yet, though, we laid a lot of the ground work for
doing this in Spark 1.4.

On Wed, Sep 23, 2015 at 11:17 PM, Karthik 
wrote:

> Any ideas or suggestions?
>
> Thanks,
> Karthik.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Querying-on-multiple-Hive-stores-using-Apache-Spark-tp24765p24797.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Reading Hive Tables using SQLContext

2015-09-24 Thread Michael Armbrust
No, you have to use a HiveContext.

On Thu, Sep 24, 2015 at 2:47 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hello,
>
> Is it possible to access Hive tables directly from SQLContext instead of
> HiveContext? I am facing with errors while doing it.
>
> Please let me know
>
>
> Thanks
>
> Sathish
>


Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
Check out:
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

On Tue, Sep 22, 2015 at 12:49 PM, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Michael
>
> Thank you for your prompt answer. I will repost after I try this again on
> 1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would
> be very helpful. I am familiar with the Hive data types, but there is very
> little documentation on Spark SQL data types.
>
> Regards
> Deenar
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 22 September 2015 at 19:28, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think that you are hitting a bug (which should be fixed in Spark
>> 1.5.1).  I'm hoping we can cut an RC for that this week.  Until then you
>> could try building branch-1.5.
>>
>> On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am trying to write an UDAF ArraySum, that does element wise sum of
>>> arrays of Doubles returning an array of Double following the sample in
>>>
>>> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
>>> I am getting the following error. Any guidance on handle complex type in
>>> Spark SQL would be appreciated.
>>>
>>> Regards
>>> Deenar
>>>
>>> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>>> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.functions._
>>>
>>> class ArraySum extends UserDefinedAggregateFunction {
>>>def inputSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>
>>>   def bufferSchema: StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>
>>>   def dataType: DataType = ArrayType(DoubleType, false)
>>>
>>>   def deterministic: Boolean = true
>>>
>>>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>>> buffer(0) = Nil
>>>   }
>>>
>>>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer.getSeq(0)
>>> val currentRow : Seq[Double] = input.getSeq(0)
>>> buffer(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>
>>>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer1.getSeq(0)
>>> val currentRow : Seq[Double] = buffer2.getSeq(0)
>>> buffer1(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>
>>>   def evaluate(buffer: Row): Any = {
>>> buffer.getSeq(0)
>>>   }
>>> }
>>>
>>> val arraySum = new ArraySum
>>> sqlContext.udf.register("ArraySum", arraySum)
>>>
>>> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
>>> '2015-05-22' limit 10*
>>>
>>> gives me the following error
>>>
>>>
>>> Error in SQL statement: SparkException: Job aborted due to stage
>>> failure: Task 0 in stage 219.0 failed 4 times, most recent failure: Lost
>>> task 0.3 in stage 219.0 (TID 11242, 10.172.255.236):
>>> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
>>> cannot be cast to org.apache.spark.sql.types.ArrayData at
>>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
>>> at
>>> org.apache.spark.sql.catalys

Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
I think that you are hitting a bug (which should be fixed in Spark 1.5.1).
I'm hoping we can cut an RC for that this week.  Until then you could try
building branch-1.5.

On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar  wrote:

> Hi
>
> I am trying to write an UDAF ArraySum, that does element wise sum of
> arrays of Doubles returning an array of Double following the sample in
>
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
> I am getting the following error. Any guidance on handle complex type in
> Spark SQL would be appreciated.
>
> Regards
> Deenar
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
>
> class ArraySum extends UserDefinedAggregateFunction {
>def inputSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>
>   def bufferSchema: StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>
>   def dataType: DataType = ArrayType(DoubleType, false)
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Nil
>   }
>
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
> val currentSum : Seq[Double] = buffer.getSeq(0)
> val currentRow : Seq[Double] = input.getSeq(0)
> buffer(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> val currentSum : Seq[Double] = buffer1.getSeq(0)
> val currentRow : Seq[Double] = buffer2.getSeq(0)
> buffer1(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>
>   def evaluate(buffer: Row): Any = {
> buffer.getSeq(0)
>   }
> }
>
> val arraySum = new ArraySum
> sqlContext.udf.register("ArraySum", arraySum)
>
> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
> '2015-05-22' limit 10*
>
> gives me the following error
>
>
> Error in SQL statement: SparkException: Job aborted due to stage failure:
> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
> org.apache.spark.sql.types.ArrayData at
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
> at
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
> at
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
> Source) at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
> at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
> scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157) at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> 

Re: Count for select not matching count for group by

2015-09-22 Thread Michael Armbrust
This looks like something is wrong with predicate pushdown.  Can you
include the output of calling explain, and tell us what format the data is
stored in?

On Mon, Sep 21, 2015 at 8:06 AM, Michael Kelly 
wrote:

> Hi,
>
> I'm seeing some strange behaviour with spark 1.5, I have a dataframe
> that I have built from loading and joining some hive tables stored in
> s3.
>
> The dataframe is cached in memory, using df.cache.
>
> What I'm seeing is that the counts I get when I do a group by on a
> column are different from what I get when I filter/select and count.
>
> df.select("outcome").groupBy("outcome").count.show
> outcome | count
> --
> 'A'   |  100
> 'B'   |  200
>
> df.filter("outcome = 'A'").count
> # 50
>
> df.filter(df("outcome") === "A").count
> # 50
>
> I expect the count of columns that match 'A' in the groupBy to match
> the count when filtering. Any ideas what might be happening?
>
> Thanks,
>
> Michael
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: HiveQL Compatibility (0.12.0, 0.13.0???)

2015-09-21 Thread Michael Armbrust
In general we welcome pull requests for these kind of updates.  In this
case its already been fixed in master and branch-1.5 and will be updated
when we release 1.5.1 (hopefully soon).

On Mon, Sep 21, 2015 at 1:21 PM, Dominic Ricard <
dominic.ric...@tritondigital.com> wrote:

> Hi,
>here's a statement from the Spark 1.5.0  Spark SQL and DataFrame Guide
> <
> https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive
> >
> :
>
> *Compatibility with Apache Hive*
> Spark SQL is designed to be compatible with the Hive Metastore, SerDes and
> UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.
>
> After testing many functions available in 1.1.0 and 1.2.0, I tend to think
> that this is no longer true...
>
> Could someone update the documentation or tell me what these versions refer
> to as it appears that Spark SQL 1.5.0 support everything in Hive 1.2.0...
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HiveQL-Compatibility-0-12-0-0-13-0-tp24757.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to add sparkSQL into a standalone application

2015-09-17 Thread Michael Armbrust
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.1"

Though, I would consider using spark-hive and HiveContext, as the
query parser is more powerful and you'll have access to window
functions and other features.


On Thu, Sep 17, 2015 at 10:59 AM, Cui Lin  wrote:

> Hello,
>
> I got stuck in adding spark sql into my standalone application.
> The build.sbt is defined as:
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
>
>
> I got the following error when building the package:
>
> *[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object sql is 
> not a member of package org.apache.spark
> [error] import org.apache.spark.sql.SQLContext;
> [error] ^
> [error] /data/workspace/test/src/main/scala/TestMain.scala:19: object sql is 
> not a member of package org.apache.spark
> [error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> [error]   ^
> [error] two errors found
> [error] (compile:compile) Compilation failed*
>
>
> So sparksql is not part of spark core package? I have no issue when
> testing my codes in spark-shell. Thanks for the help!
>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: How to add sparkSQL into a standalone application

2015-09-17 Thread Michael Armbrust
You don't need to set anything up, it'll create a local hive metastore by
default if you don't explicitly configure one.

On Thu, Sep 17, 2015 at 11:45 AM, Cui Lin <icecreamlc...@gmail.com> wrote:

> Hi, Michael,
>
> It works to me! Thanks a lot!
> If I use spark-hive or HiveContext, do I have to setup Hive on server? Can
> I run this on my local laptop?
>
> On Thu, Sep 17, 2015 at 11:02 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.1"
>>
>> Though, I would consider using spark-hive and HiveContext, as the query 
>> parser is more powerful and you'll have access to window functions and other 
>> features.
>>
>>
>> On Thu, Sep 17, 2015 at 10:59 AM, Cui Lin <icecreamlc...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I got stuck in adding spark sql into my standalone application.
>>> The build.sbt is defined as:
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
>>>
>>>
>>> I got the following error when building the package:
>>>
>>> *[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object sql 
>>> is not a member of package org.apache.spark
>>> [error] import org.apache.spark.sql.SQLContext;
>>> [error] ^
>>> [error] /data/workspace/test/src/main/scala/TestMain.scala:19: object sql 
>>> is not a member of package org.apache.spark
>>> [error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>> [error]   ^
>>> [error] two errors found
>>> [error] (compile:compile) Compilation failed*
>>>
>>>
>>> So sparksql is not part of spark core package? I have no issue when
>>> testing my codes in spark-shell. Thanks for the help!
>>>
>>>
>>>
>>> --
>>> Best regards!
>>>
>>> Lin,Cui
>>>
>>
>>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Michael Armbrust
from pyspark.sql.functions import *

​

df = sqlContext.range(10).select(rand().alias("a"), rand().alias("b"))

df.where("a > b").show()

(2) Spark Jobs
+--+---+ | a| b|
+--+---+
|0.6697439215581628|0.23420961030968923| |0.9248996796756386|
0.4146647917936366| +--+---+

On Thu, Sep 17, 2015 at 9:32 AM, Rex X  wrote:

> With Pandas dataframe
> ,
> we can do query:
>
> >>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
> >>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')
>
>
> This SQL-select-like query is very convenient. Can we do similar thing
> with the new dataframe of spark?
>
>
> Best,
> Rex
>


Re: selecting columns with the same name in a join

2015-09-11 Thread Michael Armbrust
Here is what I get on branch-1.5:

x = sc.parallelize([dict(k=1, v="Evert"), dict(k=2, v="Erik")]).toDF()
y = sc.parallelize([dict(k=1, v="Ruud"), dict(k=3, v="Vincent")]).toDF()
x.registerTempTable('x')
y.registerTempTable('y')
sqlContext.sql("select y.v, x.v FROM x INNER JOIN y ON x.k=y.k").collect()

Out[1]: [Row(v=u'Ruud', v=u'Evert')]

On Fri, Sep 11, 2015 at 3:14 AM, Evert Lammerts 
wrote:

> Am I overlooking something? This doesn't seem right:
>
> x = sc.parallelize([dict(k=1, v="Evert"), dict(k=2, v="Erik")]).toDF()
> y = sc.parallelize([dict(k=1, v="Ruud"), dict(k=3, v="Vincent")]).toDF()
> x.registerTempTable('x')
> y.registerTempTable('y')
> sqlContext.sql("select y.v, x.v FROM x INNER JOIN y ON x.k=y.k").collect()
>
> Out[26]: [Row(v=u'Evert', v=u'Evert')]
>
> May just be because I'm behind; I'm on:
>
> Spark 1.5.0-SNAPSHOT (git revision 27ef854) built for Hadoop 2.6.0 Build
> flags: -Pyarn -Psparkr -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
> -Phive-thriftserver -DskipTests
>
> Can somebody check whether the above code does work on the latest release?
>
> Thanks!
> Evert
>


Re: Custom UDAF Evaluated Over Window

2015-09-10 Thread Michael Armbrust
The only way to do this today is to write it as a Hive UDAF.  We hope to
improve the window functions to use our native aggregation in a future
release.

On Thu, Sep 10, 2015 at 12:26 AM, xander92 
wrote:

> While testing out the new UserDefinedAggregateFunction in Spark 1.5.0, I
> successfully implemented a simple function to compute an average. I then
> tried to test this function by applying it over a simple window and I got
> an
> error saying that my function is not supported over window operation.
>
> So, is applying custom UDAFs over windows possible in Spark 1.5.0 and I
> simply have a mistake somewhere?. If it is not possible, are there patches
> that make this sort of thing possible that are simply not included in the
> new release or is this functionality something that will hopefully come
> soon
> in a later release?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDAF-Evaluated-Over-Window-tp24637.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Creating Parquet external table using HiveContext API

2015-09-10 Thread Michael Armbrust
Easiest is to just use SQL:

hiveContext.sql("CREATE TABLE  USING parquet OPTIONS (path
'')")

When you specify the path its automatically created as an external table.
The schema will be discovered.

On Wed, Sep 9, 2015 at 9:33 PM, Mohammad Islam 
wrote:

> Hi,
> I want to create  an external hive table using HiveContext. I have the
> following :
> 1. full path/location of parquet data directory
> 2. name of the new table
> 3. I can get the schema as well.
>
> What API will be the best (for 1,3.x or 1.4.x)? I can see 6
> createExternalTable() APIs but not sure which one will be the best.
> I didn't find any good documentation in source code or Java doc about the
> parameters of the APIs (i.e path, source, options etc);
>
> Any help will be appreciated.
>
>
> Regards,
> Mohammad
>
>


Re: spark 1.5 SQL slows down dramatically by 50%+ compared with spark 1.4.1 SQL

2015-09-10 Thread Michael Armbrust
I've been running TPC-DS SF=1500 daily on Spark 1.4.1 and Spark 1.5 on S3,
so this is surprising.  In my experiments Spark 1.5 is either the same or
faster than 1.4 with only small exceptions.  A few thoughts,

 - 600 partitions is probably way too many for 6G of data.
 - Providing the output of explain for both runs would be helpful whenever
reporting performance changes.

On Thu, Sep 10, 2015 at 1:24 AM, Todd  wrote:

> Hi,
>
> I am using data generated with sparksqlperf(
> https://github.com/databricks/spark-sql-perf) to test the spark sql
> performance (spark on yarn, with 10 nodes) with the following code (The
> table store_sales is about 90 million records, 6G in size)
>
> val
> outputDir="hdfs://tmp/spark_perf/scaleFactor=30/useDecimal=true/store_sales"
> val name="store_sales"
> sqlContext.sql(
>   s"""
>   |CREATE TEMPORARY TABLE ${name}
>   |USING org.apache.spark.sql.parquet
>   |OPTIONS (
>   |  path '${outputDir}'
>   |)
> """.stripMargin)
>
> val sql="""
>  |select
>  |  t1.ss_quantity,
>  |  t1.ss_list_price,
>  |  t1.ss_coupon_amt,
>  |  t1.ss_cdemo_sk,
>  |  t1.ss_item_sk,
>  |  t1.ss_promo_sk,
>  |  t1.ss_sold_date_sk
>  |from store_sales t1 join store_sales t2 on t1.ss_item_sk =
> t2.ss_item_sk
>  |where
>  |  t1.ss_sold_date_sk between 2450815 and 2451179
>""".stripMargin
>
> val df = sqlContext.sql(sql)
> df.rdd.foreach(row=>Unit)
>
> With 1.4.1, I can finish the query in 6 minutes,  but  I need 10+ minutes
> with 1.5.
>
> The configuration are basically the same, since I copy the configuration
> from 1.4.1 to 1.5:
>
> sparkVersion1.4.11.5.0
> scaleFactor3030
> spark.sql.shuffle.partitions600600
> spark.sql.sources.partitionDiscovery.enabledtruetrue
> spark.default.parallelism200200
> spark.driver.memory4G4G4G
> spark.executor.memory4G4G
> spark.executor.instances1010
> spark.shuffle.consolidateFilestruetrue
> spark.storage.memoryFraction0.40.4
> spark.executor.cores33
>
> I am not sure where is going wrong,any ideas?
>
>
>


Re: Avoiding SQL Injection in Spark SQL

2015-09-10 Thread Michael Armbrust
Either that or use the DataFrame API, which directly constructs query plans
and thus doesn't suffer from injection attacks (and runs on the same
execution engine).

On Thu, Sep 10, 2015 at 12:10 AM, Sean Owen  wrote:

> I don't think this is Spark-specific. Mostly you need to escape /
> quote user-supplied values as with any SQL engine.
>
> On Thu, Sep 10, 2015 at 7:32 AM, V Dineshkumar
>  wrote:
> > Hi,
> >
> > What is the preferred way of avoiding SQL Injection while using Spark
> SQL?
> > In our use case we have to take the parameters directly from the users
> and
> > prepare the SQL Statement.I was not able to find any API for preparing
> the
> > SQL statement safely avoiding injection.
> >
> > Thanks,
> > Dinesh
> > Philips India
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame creation delay?

2015-09-04 Thread Michael Armbrust
Also, do you mean two partitions or two partition columns?  If there are
many partitions it can be much slower.  In Spark 1.5 I'd consider
setting spark.sql.hive.metastorePartitionPruning=true
if you have predicates over the partition columns.

On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> What format is this table.  For parquet and other optimized formats we
> cache a bunch of file metadata on first access to make interactive queries
> faster.
>
> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using SparkSQL to query some Hive tables. Most of the time, when I
>> create a DataFrame using sqlContext.sql("select * from table") command,
>> DataFrame creation is less than 0.5 second.
>> But I have this one table with which it takes almost 12 seconds!
>>
>> scala>  val start = scala.compat.Platform.currentTime; val logs =
>> sqlContext.sql("select * from temp.log"); val execution =
>> scala.compat.Platform.currentTime - start
>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
>> temp.log
>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
>> start: Long = 1441336022731
>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
>> log_time: string, tag: string, dt: string, test_id: int]
>> execution: Long = *11567*
>>
>> This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
>> I have created DataFrames on even larger tables and do not see such
>> delay.
>> So my questions are:
>> - What can impact DataFrame creation time?
>> - Is it related to the table partitions?
>>
>>
>> Thanks much your help!
>>
>> Isabelle
>>
>
>


Re: DataFrame creation delay?

2015-09-04 Thread Michael Armbrust
What format is this table.  For parquet and other optimized formats we
cache a bunch of file metadata on first access to make interactive queries
faster.

On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan  wrote:

> Hello,
>
> I am using SparkSQL to query some Hive tables. Most of the time, when I
> create a DataFrame using sqlContext.sql("select * from table") command,
> DataFrame creation is less than 0.5 second.
> But I have this one table with which it takes almost 12 seconds!
>
> scala>  val start = scala.compat.Platform.currentTime; val logs =
> sqlContext.sql("select * from temp.log"); val execution =
> scala.compat.Platform.currentTime - start
> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from temp.log
> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
> start: Long = 1441336022731
> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
> log_time: string, tag: string, dt: string, test_id: int]
> execution: Long = *11567*
>
> This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
> I have created DataFrames on even larger tables and do not see such delay.
> So my questions are:
> - What can impact DataFrame creation time?
> - Is it related to the table partitions?
>
>
> Thanks much your help!
>
> Isabelle
>


Re: DataFrame creation delay?

2015-09-04 Thread Michael Armbrust
If you run sqlContext.table("...").registerTempTable("...") that temptable
will cache the lookup of partitions.

On Fri, Sep 4, 2015 at 1:16 PM, Isabelle Phan <nlip...@gmail.com> wrote:

> Hi Michael,
>
> Thanks a lot for your reply.
>
> This table is stored as text file with tab delimited columns.
>
> You are correct, the problem is because my table has too many partitions
> (1825 in total). Since I am on Spark 1.4, I think I am hitting bug 6984
> <https://issues.apache.org/jira/browse/SPARK-6984>.
>
> Not sure when my company can move to 1.5. Would you know some workaround
> for this bug?
> If I cannot find workaround for this, will have to change our schema
> design to reduce number of partitions.
>
>
> Thanks,
>
> Isabelle
>
>
>
> On Fri, Sep 4, 2015 at 12:56 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Also, do you mean two partitions or two partition columns?  If there are
>> many partitions it can be much slower.  In Spark 1.5 I'd consider setting 
>> spark.sql.hive.metastorePartitionPruning=true
>> if you have predicates over the partition columns.
>>
>> On Fri, Sep 4, 2015 at 12:54 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> What format is this table.  For parquet and other optimized formats we
>>> cache a bunch of file metadata on first access to make interactive queries
>>> faster.
>>>
>>> On Thu, Sep 3, 2015 at 8:17 PM, Isabelle Phan <nlip...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using SparkSQL to query some Hive tables. Most of the time, when I
>>>> create a DataFrame using sqlContext.sql("select * from table") command,
>>>> DataFrame creation is less than 0.5 second.
>>>> But I have this one table with which it takes almost 12 seconds!
>>>>
>>>> scala>  val start = scala.compat.Platform.currentTime; val logs =
>>>> sqlContext.sql("select * from temp.log"); val execution =
>>>> scala.compat.Platform.currentTime - start
>>>> 15/09/04 12:07:02 INFO ParseDriver: Parsing command: select * from
>>>> temp.log
>>>> 15/09/04 12:07:02 INFO ParseDriver: Parse Completed
>>>> start: Long = 1441336022731
>>>> logs: org.apache.spark.sql.DataFrame = [user_id: string, option: int,
>>>> log_time: string, tag: string, dt: string, test_id: int]
>>>> execution: Long = *11567*
>>>>
>>>> This table has 3.6 B rows, and 2 partitions (on dt and test_id columns).
>>>> I have created DataFrames on even larger tables and do not see such
>>>> delay.
>>>> So my questions are:
>>>> - What can impact DataFrame creation time?
>>>> - Is it related to the table partitions?
>>>>
>>>>
>>>> Thanks much your help!
>>>>
>>>> Isabelle
>>>>
>>>
>>>
>>
>


Re: spark 1.5 sort slow

2015-09-02 Thread Michael Armbrust
Can you include the output of `explain()` for each of the runs?

On Tue, Sep 1, 2015 at 1:06 AM, patcharee  wrote:

> Hi,
>
> I found spark 1.5 sorting is very slow compared to spark 1.4. Below is my
> code snippet
>
> val sqlRDD = sql("select date, u, v, z from fino3_hr3 where zone == 2
> and z >= 2 and z <= order by date, z")
> println("sqlRDD " + sqlRDD.count())
>
> The fino3_hr3 (in the sql command) is a hive table in orc format,
> partitioned by zone and z.
>
> Spark 1.5 takes 4.5 mins to execute this sql, while spark 1.4 takes 1.5
> mins. I noticed that dissimilar to spark 1.4 when spark 1.5 sorted, data
> was shuffled into few tasks, not divided for all tasks. Do I need to set
> any configuration explicitly? Any suggestions?
>
> BR,
> Patcharee
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: wild cards in spark sql

2015-09-02 Thread Michael Armbrust
That query should work.

On Wed, Sep 2, 2015 at 1:50 PM, Hafiz Mujadid 
wrote:

> Hi
>
> does spark sql support wild cards to filter data in sql queries just like
> we
> can filter data in sql queries in RDBMS with different wild cards like %
> and
> ? etc. In other words how can i write following query in spar sql
>
> select * from employee where ename like 'a%d'
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/wild-cards-in-spark-sql-tp24563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark DataFrame saveAsTable with partitionBy creates no ORC file in HDFS

2015-09-02 Thread Michael Armbrust
Before Spark 1.5, tables created using saveAsTable cannot be queried by
Hive because we only store Spark SQL metadata.  In Spark 1.5 for parquet
and ORC we store both, but this will not work with partitioned tables
because hive does not support dynamic partition discovery.

On Wed, Sep 2, 2015 at 1:34 PM, unk1102  wrote:

> Hi I have a Spark dataframe which I want to save as hive table with
> partitions. I tried the following two statements but they dont work I dont
> see any ORC files in HDFS directory its empty. I can see baseTable is there
> in Hive console but obviously its empty because of no files inside HDFS.
> The
> following two lines saveAsTable() and insertInto()do not work.
> registerDataFrameAsTable() method works but it creates in memory table and
> causing OOM in my use case as I have thousands of hive partitions to
> prcoess. Please guide I am new to Spark. Thanks in advance.
>
>
> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").saveAsTable("baseTable");
>
>
> dataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity","date").insertInto("baseTable");
>
> //the following works but creates in memory table and seems to be reason
> for
> OOM in my case
>
> hiveContext.registerDataFrameAsTable(dataFrame, "baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-saveAsTable-with-partitionBy-creates-no-ORC-file-in-HDFS-tp24562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: query avro hive table in spark sql

2015-08-27 Thread Michael Armbrust

 BTY, spark-avro works great for our experience, but still, some non-tech
 people just want to use as a SQL shell in spark, like HIVE-CLI.


To clarify: you can still use the spark-avro library with pure SQL.  Just
use the CREATE TABLE ... USING com.databricks.spark.avro OPTIONS (path
'...') syntax.


Re: Data Frame support CSV or excel format ?

2015-08-27 Thread Michael Armbrust
Check out spark-csv: http://spark-packages.org/package/databricks/spark-csv

On Thu, Aug 27, 2015 at 11:48 AM, spark user spark_u...@yahoo.com.invalid
wrote:

 Hi all ,

 Can we create data frame from excels sheet or csv file , in below example
 It seems they support only json ?



 DataFrame df = 
 sqlContext.read().json(examples/src/main/resources/people.json);





Re: Differing performance in self joins

2015-08-26 Thread Michael Armbrust
-dev +user

I'd suggest running .explain() on both dataframes to understand the
performance better.  The problem is likely that we have a pattern that
looks for cases where you have an equality predicate where either side can
be evaluated using one side of the join.  We turn this into a hash join.

(df(eday) - laggard(p_eday)) === 1) is pretty tricky for us to
understand, and so the pattern misses the possible optimized plan.

On Wed, Aug 26, 2015 at 6:10 PM, David Smith das...@gmail.com wrote:

 I've noticed that two queries, which return identical results, have very
 different performance. I'd be interested in any hints about how avoid
 problems like this.

 The DataFrame df contains a string field series and an integer eday,
 the
 number of days since (or before) the 1970-01-01 epoch.

 I'm doing some analysis over a sliding date window and, for now, avoiding
 UDAFs. I'm therefore using a self join. First, I create

 val laggard = df.withColumnRenamed(series,
 p_series).withColumnRenamed(eday, p_eday)

 Then, the following query runs in 16s:

 df.join(laggard, (df(series) === laggard(p_series))  (df(eday) ===
 (laggard(p_eday) + 1))).count

 while the following query runs in 4 - 6 minutes:

 df.join(laggard, (df(series) === laggard(p_series))  ((df(eday) -
 laggard(p_eday)) === 1)).count

 It's worth noting that the series term is necessary to keep the query from
 doing a complete cartesian product over the data.

 Ideally, I'd like to look at lags of more than one day, but the following
 is
 equally slow:

 df.join(laggard, (df(series) === laggard(p_series))  (df(eday) -
 laggard(p_eday)).between(1,7)).count

 Any advice about the general principle at work here would be welcome.

 Thanks,
 David



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Differing-performance-in-self-joins-tp13864.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: query avro hive table in spark sql

2015-08-26 Thread Michael Armbrust
I'd suggest looking at
http://spark-packages.org/package/databricks/spark-avro

On Wed, Aug 26, 2015 at 11:32 AM, gpatcham gpatc...@gmail.com wrote:

 Hi,

 I'm trying to query hive table which is based on avro in spark SQL and
 seeing below errors.

 15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException
 determining schema. Returning signal schema to indicate problem
 org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither
 avro.schema.literal nor avro.schema.url specified, can't determine table
 schema
 at

 org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68)
 at

 org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93)
 at
 org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60)
 at

 org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375)
 at

 org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249)


 Its not able to determine schema. Hive table is pointing to avro schema
 using url. I'm stuck and couldn't find more info on this.

 Any pointers ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Michael Armbrust
I'd suggest setting sbt to fork when running tests.

On Wed, Aug 26, 2015 at 10:51 AM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Thanks for your response Yana,

 I can increase the MaxPermSize parameter and it will allow me to run the
 unit test a few more times before I run out of memory.

 However, the primary issue is that running the same unit test in the same
 JVM (multiple times) results in increased memory (each run of the unit
 test) and I believe it has something to do with HiveContext not reclaiming
 memory after it is finished (or I'm not shutting it down properly).

 It could very well be related to sbt, however, it's not clear to me.


 On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 The PermGen space error is controlled with MaxPermSize parameter. I run
 with this in my pom, I think copied pretty literally from Spark's own
 tests... I don't know what the sbt equivalent is but you should be able to
 pass it...possibly via SBT_OPTS?


  plugin
   groupIdorg.scalatest/groupId
   artifactIdscalatest-maven-plugin/artifactId
   version1.0/version
   configuration

 reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory
   parallelfalse/parallel
   junitxml./junitxml
   filereportsSparkTestSuite.txt/filereports
   argLine-Xmx3g -XX:MaxPermSize=256m
 -XX:ReservedCodeCacheSize=512m/argLine
   stderr/
   systemProperties
   java.awt.headlesstrue/java.awt.headless
   spark.testing1/spark.testing
   spark.ui.enabledfalse/spark.ui.enabled

 spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts
   /systemProperties
   /configuration
   executions
   execution
   idtest/id
   goals
   goaltest/goal
   /goals
   /execution
   /executions
   /plugin
   /plugins


 On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hello,

 I am using sbt and created a unit test where I create a `HiveContext`
 and execute some query and then return. Each time I run the unit test the
 JVM will increase it's memory usage until I get the error:

 Internal error when running tests: java.lang.OutOfMemoryError: PermGen
 space
 Exception in thread Thread-2 java.io.EOFException

 As a work-around, I can fork a new JVM each time I run the unit test,
 however, it seems like a bad solution as takes a while to run the unit
 test.

 By the way, I tried to importing the TestHiveContext:

- import org.apache.spark.sql.hive.test.TestHiveContext

 However, it suffers from the same memory issue. Has anyone else suffered
 from the same problem? Note that I am running these unit tests on my mac.

 Cheers, Mike.






Re: What does Attribute and AttributeReference mean in Spark SQL

2015-08-25 Thread Michael Armbrust
Attribute is the Catalyst name for an input column from a child operator.
An AttributeReference has been resolved, meaning we know which input column
in particular it is referring too.  An AttributeReference also has a known
DataType.  In contrast, before analysis there might still exist
UnresolvedReferences, which are just string identifiers from a parsed query.

An Expression can be more complex (like you suggested,  a + b), though
technically just a is also a very simple Expression.  The following console
session shows how these types are composed:

$ build/sbt sql/console
import org.apache.spark.SparkContextimport
org.apache.spark.sql.SQLContextimport
org.apache.spark.sql.catalyst.analysis._import
org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.dsl.expressions._import
org.apache.spark.sql.catalyst.dsl.plans._

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5adfe37d
sqlContext: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.SQLContext@20d05227import
sqlContext.implicits._import sqlContext._Welcome to Scala version
2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45).Type in
expressions to have them evaluated.Type :help for more information.

scala val unresolvedAttr: UnresolvedAttribute = 'a
unresolvedAttr: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'a

scala val relation = LocalRelation('a.int)
relation: org.apache.spark.sql.catalyst.plans.logical.LocalRelation =
LocalRelation [a#0]

scala val parsedQuery = relation.select(unresolvedAttr)
parsedQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Project ['a]
 LocalRelation [a#0]

scala parsedQuery.analyze
res11: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [a#0]
 LocalRelation [a#0]

The #0 after a is a unique identifier (within this JVM) that says where the
data is coming from, even as plans are rearranged due to optimizations.

On Mon, Aug 24, 2015 at 6:13 PM, Todd bit1...@163.com wrote:

 There are many such kind of case class or concept such as
 Attribute/AttributeReference/Expression in Spark SQL

 I would ask what Attribute/AttributeReference/Expression mean, given a sql
 query like select a,b from c, it a,  b are two Attributes? a + b is an
 expression?
 Looks I misunderstand it because Attribute is extending Expression in the
 code,which means Attribute itself is an Expression.


 Thanks.



Re: Drop table and Hive warehouse

2015-08-24 Thread Michael Armbrust
Thats not the expected behavior.  What version of Spark?

On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote:

 When I store DataFrame as table with command saveAsTable and then
 execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive
 warehouse.
 The table disappears from a table list but the data files are still alive.
 Because of this, I can't saveAsTable with a same name before dropping
 table.
 Is it a normal situation? If it is, I will delete files manually ;)

 Kevin



 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
  관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
  금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다.
 The contents of this e-mail message and any attachments are confidential
 and are intended solely for addressee.
  The information may also be legally privileged. This transmission is sent
 in trust, for the sole purpose of delivery
  to the intended recipient. If you have received this transmission in
 error, any use, reproduction or dissemination of
  this transmission is strictly prohibited. If you are not the intended
 recipient, please immediately notify the sender
  by reply e-mail or phone and delete this message and its attachments, if
 any.


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Michael Armbrust
No, starting with Spark 1.5 we should by default only be reading the
footers on the executor side (that is unless schema merging has been
explicitly turned on).

On Mon, Aug 24, 2015 at 12:20 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 @Michael: would listStatus calls read the actual parquet footers within
 the folders?

 On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott 
 scott.sere...@nielsen.com wrote:

 Can you please remove me from this distribution list?



 (Filling up my inbox too fast)



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Monday, August 24, 2015 2:13 PM
 *To:* Philip Weaver philip.wea...@gmail.com
 *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
 raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
 hao.ch...@intel.com

 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I think we are mostly bottlenecked at this point by how fast we can make
 listStatus calls to discover the folders.  That said, we are happy to
 accept suggestions or PRs to make this faster.  Perhaps you can describe
 how your home grown partitioning works?



 On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 1 minute to discover 1000s of partitions -- yes, that is what I have
 observed. And I would assert that is very slow.



 On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.



 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.



 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.







 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?



 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!



 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...

 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!



 (table is partitioned by date_prefix and hour)

 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';



 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]

  TungstenExchange SinglePartition

   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]

Scan ParquetRelation[ .. about 1000 partition paths go here ]



 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?



 Thanks!



 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?



 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Michael Armbrust
Follow the directions here: http://spark.apache.org/community.html

On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com
wrote:

 Can you please remove me from this distribution list?



 (Filling up my inbox too fast)



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Monday, August 24, 2015 2:13 PM
 *To:* Philip Weaver philip.wea...@gmail.com
 *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
 raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
 hao.ch...@intel.com

 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I think we are mostly bottlenecked at this point by how fast we can make
 listStatus calls to discover the folders.  That said, we are happy to
 accept suggestions or PRs to make this faster.  Perhaps you can describe
 how your home grown partitioning works?



 On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 1 minute to discover 1000s of partitions -- yes, that is what I have
 observed. And I would assert that is very slow.



 On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.



 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.



 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.







 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?



 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!



 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works really
 well with parquet which is available in 2.7. They fixed lot of issues
 related to metadata reading there...

 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!



 (table is partitioned by date_prefix and hour)

 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';



 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]

  TungstenExchange SinglePartition

   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]

Scan ParquetRelation[ .. about 1000 partition paths go here ]



 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?



 Thanks!



 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and
 I couldn't find much information about it online. What does it mean exactly
 to disable it? Are there any negative consequences to disabling it?



 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user

Re: Array Out OF Bound Exception

2015-08-24 Thread Michael Armbrust
This top line here is indicating that the exception is being throw from
your code (i.e. code written in the console).

at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)


Check to make sure that you are properly handling data that has less
columns than you would expect.



On Mon, Aug 24, 2015 at 12:41 PM, SAHA, DEBOBROTA ds3...@att.com wrote:

 Hi ,



 I am using SPARK 1.4 and I am getting an array out of bound Exception when
 I am trying to read from a registered table in SPARK.



 For example If I have 3 different text files with the content as below:



 *Scenario 1*:

 A1|B1|C1

 A2|B2|C2



 *Scenario 2*:

 A1| |C1

 A2| |C2



 *Scenario 3*:

 A1| B1|

 A2| B2|



 So for Scenario 1 and 2 it’s working fine but for Scenario 3 I am getting
 the following error:



 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)

 at scala.collection.AbstractIterator.to(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

 at org.apache.spark.scheduler.Task.run(Task.scala:70)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)



 Driver stacktrace:

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)

 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



 Please help.



 Thanks,

 Debobrota









Re: DataFrame/JDBC very slow performance

2015-08-24 Thread Michael Armbrust

 Much appreciated! I am not comparing with select count(*) for
 performance, but it was one simple thing I tried to check the performance
 :). I think it now makes sense since Spark tries to extract all records
 before doing the count. I thought having an aggregated function query
 submitted over JDBC/Teradata would let Teradata do the heavy lifting.


We currently only push down filters since there is a lot of variability in
what types of aggregations various databases support.  You can manually
pushdown whatever you want by replacing the table name with a subquery
(i.e. (SELECT ... FROM ...))

   - How come my second query for (5B) records didn't return anything
 even after a long processing? If I understood correctly, Spark would try to
 fit it in memory and if not then might use disk space, which I have
 available?


Nothing should be held in memory for a query like this (other than a single
count per partition), so I don't think that is the problem.  There is
likely an error buried somewhere.


  - Am I supposed to do any Spark related tuning to make it work?

 My main need is to access data from these large table(s) on demand and
 provide aggregated and calculated results much quicker, for that  I was
 trying out Spark. Next step I am thinking to export data in Parque files
 and give it a try. Do you have any suggestions for to deal with the problem?


Exporting to parquet will likely be a faster option that trying to query
through JDBC, since we have many more opportunities for parallelism here.


Re: SparkSQL concerning materials

2015-08-23 Thread Michael Armbrust
Here's a longer version of that talk that I gave, which goes into more
detail on the internals:
http://www.slideshare.net/databricks/spark-sql-deep-dive-melbroune

On Fri, Aug 21, 2015 at 8:28 AM, Sameer Farooqui same...@databricks.com
wrote:

 Have you seen the Spark SQL paper?:
 https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf

 On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 thanks for answers. I have read answers you provided, but I rather look
 for some materials on the internals. E.g how the optimizer works, how the
 query is translated into rdd operations etc. The API I am quite familiar
 with.
 A good starting point for me was: Spark DataFrames: Simple and Fast
 Analysis of Structured Data
 https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term=

 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com:

 Or if you're a python lover then this is a good place -
 https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#



 On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif 
 muhammadatif...@gmail.com wrote:

 Hi Dawid

 The best pace to get started is the Spark SQL Guide from Apache
 http://spark.apache.org/docs/latest/sql-programming-guide.html

 Regards
 Muhammad

 On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz 
 wysakowicz.da...@gmail.com wrote:

 Hi,

 I would like to dip into SparkSQL. Get to know better the
 architecture, good practices, some internals. Could you advise me some
 materials on this matter?

 Regards
 Dawid









Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Michael Armbrust
We should not be actually scanning all of the data of all of the
partitions, but we do need to at least list all of the available
directories so that we can apply your predicates to the actual values that
are present when we are deciding which files need to be read in a given
spark job.  While this is a somewhat expensive operation, we do it in
parallel and we cache this information when you access the same relation
more than once.

Can you provide a little more detail about how exactly you are accessing
the parquet data (are you using sqlContext.read or creating persistent
tables in the metastore?), and how long it is taking?  It would also be
good to know how many partitions we are talking about and how much data is
in each.  Finally, I'd like to see the stacktrace where it is hanging to
make sure my above assertions are correct.

We have several tables internally that have 1000s of partitions and while
it takes ~1 minute initially to discover the metadata, after that we are
able to query the data interactively.



On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 anybody has any suggestions?

 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!

 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819'
 and hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
  wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling 
 it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is
 busy with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for
 the simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
 of CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still 
 very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
 to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery
 with all partitions when the query only needs to look at one partition? 
 Is
 there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not 
 yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
 jerrickho...@gmail.com wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 
 seconds).
 However, as I added more partitions the query takes longer and longer. 
 When
 I added about 10,000 partitions, the query took way too long. I feel like
 querying for a single partition should not be affected by having more
 

Re: DataFrameWriter.jdbc is very slow

2015-08-20 Thread Michael Armbrust
We will probably fix this in Spark 1.6

https://issues.apache.org/jira/browse/SPARK-10040

On Thu, Aug 20, 2015 at 5:18 AM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com
 wrote:

 We want to migrate our data (approximately 20M rows) from parquet to postgres,
 when we are using dataframe writer's jdbc method the execution time is very
 large,  we have tried the same with batch insert it was much effective.
 Is it intentionally implemented in that way?



Re: Data frame created from hive table and its partition

2015-08-20 Thread Michael Armbrust
There is no such thing as primary keys in the Hive metastore, but Spark SQL
does support partitioned hive tables:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables

DataFrameWriter also has a partitionBy method.

On Thu, Aug 20, 2015 at 7:29 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
 wrote:

 Hi

 I have a question regarding data frame partition. I read a hive table from
 spark and following spark api converts it as DF.

 test_df = sqlContext.sql(“select * from hivetable1”)

 How does spark decide partition of test_df? Is there a way to partition
 test_df based on some column while reading hive table? Second question is,
 if that hive table has primary key declared, does spark honor PK in hive
 table and partition based on PKs?

 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Json Serde used by Spark Sql

2015-08-18 Thread Michael Armbrust
Under the covers we use Jackson's Streaming API as of Spark 1.4.

On Tue, Aug 18, 2015 at 1:12 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,

 I was wondering what json serde does spark sql use. I created a JsonRDD
 out of a json file and then registered it as a temp table to query. I can
 then query the table using dot notation for nested structs/arrays. I was
 wondering how does spark sql deserialize the json data based on the query.

 Thanks in advance,
 Udit



Re: Does Spark optimization might miss to run transformation?

2015-08-13 Thread Michael Armbrust
-dev

If you want to guarantee the side effects happen you should use foreach or
foreachPartitions.  A `take`, for example, might only evaluate a subset of
the partitions until it find enough results.

On Wed, Aug 12, 2015 at 7:06 AM, Eugene Morozov fathers...@list.ru wrote:

 Hi!

 I’d like to complete action (store / print smth) inside of transformation
 (map or mapPartitions). This approach has some flaws, but there is a
 question. Might it happen that Spark will optimise (RDD or DataFrame)
 processing so that my mapPartitions simply won’t happen?

 --
 Eugene Morozov
 fathers...@list.ru







Re: About Databricks's spark-sql-perf

2015-08-13 Thread Michael Armbrust
Hey sorry, I've been doing a bunch of refactoring on this project.  Most of
the data generation was a huge hack (it was done before we supported
partitioning natively) and used some private APIs that don't exist
anymore.  As a result, while doing the regression tests for 1.5 I deleted a
bunch of broken code.  Those file should be deleted to.

The code does work with Spark 1.4/1.5, but at least as of today mostly
requires that you have already created the data/tables. I'll work on
updating the README as the QA period moves forward.

On Thu, Aug 13, 2015 at 6:49 AM, Todd bit1...@163.com wrote:

 Hi,
 I got a question about the spark-sql-perf project by Databricks at 
 https://github.com/databricks/spark-sql-perf/


 The Tables.scala (
 https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala)
 and BigData (
 https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala)
 are  empty files.
 Is this by intention or this is a bug.
 Also,the code snippet as follows in the README.MD won't compile  as there
 is no Tables class defined in the org.apache.spark.sql.parquet package:
 (I am using Spark1.4.1, is the code compatible with Spark 1.4.1?)

 import org.apache.spark.sql.parquet.Tables
 // Tables in TPC-DS benchmark used by experiments.
 val tables = Tables(sqlContext)
 // Setup TPC-DS experiment
 val tpcds = new TPCDS (sqlContext = sqlContext)





Re: Spark inserting into parquet files with different schema

2015-08-10 Thread Michael Armbrust
Older versions of Spark (i.e. when it was still called SchemaRDD instead of
DataFrame) did not support merging different parquet schema.  However,
Spark 1.4+ should.

On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.com wrote:

 Adam, did you find a solution for this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to use custom Hadoop InputFormat in DataFrame?

2015-08-10 Thread Michael Armbrust
You can't create a DataFrame from an arbitrary object since we don't know
how to figure out the schema.  You can either create a JavaBean
https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
or manually create a row + specify the schema
https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
.



On Mon, Aug 10, 2015 at 11:22 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have my own Hadoop custom InputFormat which I want to use in
 DataFrame.
 How do we do that? I know I can use sc.hadoopFile(..) but then how do I
 convert it into DataFrame

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,??);

 In above code what should I put in place of ?? I tried to put
 MyRecordWritable.class but it does not work as it is not schema it is
 Record
 Writable. Please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-custom-Hadoop-InputFormat-in-DataFrame-tp24198.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Pagination on big table, splitting joins

2015-08-10 Thread Michael Armbrust

 I think to use *toLocalIterator* method and something like that, but I
 have doubts about memory and parallelism and sure there is a better way to
 do it.


It will still run all earlier parts of the job in parallel.  Only the
actual retrieving of the final partitions will be serial.  This is how we
do pagination in the Spark SQL JDBC server.


Re: Is there any external dependencies for lag() and lead() when using data frames?

2015-08-10 Thread Michael Armbrust
You will need to use a HiveContext for window functions to work.

On Mon, Aug 10, 2015 at 1:26 PM, Jerry jerry.c...@gmail.com wrote:

 Hello,

 Using Apache Spark 1.4.1 I'm unable to use lag or lead when making queries
 to a data frame and I'm trying to figure out if I just have a bad setup or
 if this is a bug. As for the exceptions I get: when using selectExpr() with
 a string as an argument, I get NoSuchElementException: key not found: lag
 and when using the select method and ...spark.sql.functions.lag I get an
 AnalysisException. If I replace lag with abs in the first case, Spark runs
 without exception, so none of the other syntax is incorrect.

 As for how I'm running it; the code is written in Java with a static
 method that takes the SparkContext as an argument which is used to create a
 JavaSparkContext which then is used to create an SQLContext which loads a
 json file from the local disk and runs those queries on that data frame
 object. FYI: the java code is compiled, jared and then pointed to with -cp
 when starting the spark shell, so all I do is Test.run(sc) in shell.

 Let me know what to look for to debug this problem. I'm not sure where to
 look to solve this problem.

 Thanks,
 Jerry



Re: Spark inserting into parquet files with different schema

2015-08-10 Thread Michael Armbrust
What is the error you are getting?  It would also be awesome if you could
try with Spark 1.5 when the first preview comes out (hopefully early next
week).

On Mon, Aug 10, 2015 at 11:41 AM, Simeon Simeonov s...@swoop.com wrote:

 Michael, is there an example anywhere that demonstrates how this works
 with the schema changing over time?

 Must the Hive tables be set up as external tables outside of saveAsTable?
 In my experience, in 1.4.1, writing to a table with SaveMode.Append fails
 if the schema don't match.

 Thanks,
 Sim

 From: Michael Armbrust mich...@databricks.com
 Date: Monday, August 10, 2015 at 2:36 PM
 To: Simeon Simeonov s...@swoop.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark inserting into parquet files with different schema

 Older versions of Spark (i.e. when it was still called SchemaRDD instead
 of DataFrame) did not support merging different parquet schema.  However,
 Spark 1.4+ should.

 On Sat, Aug 8, 2015 at 8:58 PM, sim s...@swoop.com wrote:

 Adam, did you find a solution for this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706p24181.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark SQL query AVRO file

2015-08-07 Thread Michael Armbrust
You can register your data as a table using this library and then query it
using HiveQL

CREATE TEMPORARY TABLE episodes
USING com.databricks.spark.avro
OPTIONS (path src/test/resources/episodes.avro)


On Fri, Aug 7, 2015 at 11:42 AM, java8964 java8...@hotmail.com wrote:

 Hi, Michael:

 I am not sure how spark-avro can help in this case.

 My understanding is that to use Spark-avro, I have to translate all the
 logic from this big Hive query into Spark code, right?

 If I have this big Hive query, how I can use spark-avro to run the query?

 Thanks

 Yong

 --
 From: mich...@databricks.com
 Date: Fri, 7 Aug 2015 11:32:21 -0700
 Subject: Re: Spark SQL query AVRO file
 To: java8...@hotmail.com
 CC: user@spark.apache.org


 Have you considered trying Spark SQL's native support for avro data?

 https://github.com/databricks/spark-avro

 On Fri, Aug 7, 2015 at 11:30 AM, java8964 java8...@hotmail.com wrote:

 Hi, Spark users:

 We currently are using Spark 1.2.2 + Hive 0.12 + Hadoop 2.2.0 on our
 production cluster, which has 42 data/task nodes.

 There is one dataset stored as Avro files about 3T. Our business has a
 complex query running for the dataset, which is stored in nest structure
 with Array of Struct in Avro and Hive.

 We can query it using Hive without any problem, but we like the SparkSQL's
 performance, so we in fact run the same query in the Spark SQL, and found
 out it is in fact much faster than Hive.

 But when we run it, we got the following error randomly from Spark
 executors, sometime seriously enough to fail the whole spark job.

 Below the stack trace, and I think it is a bug related to Spark due to:

 1) The error jumps out inconsistent, as sometimes we won't see it for this
 job. (We run it daily)
 2) Sometime it won't fail our job, as it recover after retry.
 3) Sometime it will fail our job, as I listed below.
 4) Is this due to the multithreading in Spark? The NullPointException
 indicates Hive got a Null ObjectInspector of the children of
 StructObjectInspector, as I read the Hive source code, but I know there is
 no null of ObjectInsepector as children of StructObjectInspector. Google
 this error didn't give me any hint. Does any one know anything like this?

 Project
 [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcatWS(,,CAST(account_id#23L,
 StringType),CAST(gross_contact_count_a#4L, StringType),CASE WHEN IS NULL
 tag_cnt#21 THEN 0 ELSE CAST(tag_cnt#21, StringType),CAST(list_cnt_a#5L,
 StringType),CAST(active_contact_count_a#16L,
 StringType),CAST(other_api_contact_count_a#6L,
 StringType),CAST(fb_api_contact_count_a#7L,
 StringType),CAST(evm_contact_count_a#8L,
 StringType),CAST(loyalty_contact_count_a#9L,
 StringType),CAST(mobile_jmml_contact_count_a#10L,
 StringType),CAST(savelocal_contact_count_a#11L,
 StringType),CAST(siteowner_contact_count_a#12L,
 StringType),CAST(socialcamp_service_contact_count_a#13L,
 S...org.apache.spark.SparkException: Job aborted due to stage failure: Task
 58 in stage 1.0 failed 4 times, most recent failure: Lost task 58.3 in
 stage 1.0 (TID 257, 10.20.95.146): java.lang.NullPointerException
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.supportedCategories(AvroObjectInspectorGenerator.java:139)
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:89)
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:101)
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspectorWorker(AvroObjectInspectorGenerator.java:117)
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.createObjectInspector(AvroObjectInspectorGenerator.java:81)
 at
 org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator.init(AvroObjectInspectorGenerator.java:55)
 at
 org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:69)
 at
 org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:112)
 at
 org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:109)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at 

Re: Spark SQL Hive - merge small files

2015-08-05 Thread Michael Armbrust
This feature isn't currently supported.

On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com
wrote:

 Hello,

 I would love to have hive merge the small files in my managed hive context
 after every query. Right now, I am setting the hive configuration in my
 Spark Job configuration but hive is not managing the files. Do I need to
 set the hive fields in around place? How do you set Hive configurations in
 Spark?

 Here is what I'd like to set

 hive.merge.mapfilestrue
 hive.merge.mapredfilestrue
 hive.merge.size.per.task25600
 hive.merge.smallfiles.avgsize1600



Re: Spark SQL support for Hive 0.14

2015-08-04 Thread Michael Armbrust
I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has
support for reading from metastores for Hive 0.12 - 1.2.1

On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran ste...@hortonworks.com
wrote:

 Spark 1.3.1  1.4 only support Hive 0.13

 Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14
 support entirely and go straight to the currently supported Hive release.

 See SPARK-8064 for the gory details

  On 3 Aug 2015, at 23:01, Ishwardeep Singh 
 ishwardeep.si...@impetus.co.in wrote:
 
  Hi,
 
  Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13.
 Is
  there a way to compile spark with Hive 0.14?
 
  Currently we are using Spark 1.3.1.
 
  Thanks
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: shutdown local hivecontext?

2015-08-03 Thread Michael Armbrust
TestHive takes care of creating a temporary directory for each invocation
so that multiple test runs won't conflict.

On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote:


 We are using a local hive context in order to run unit tests. Our unit
 tests runs perfectly fine if we run why by one using sbt as the next
 example:

 sbt test-only com.company.pipeline.scalers.ScalerSuite.scala
 sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala

 However, if we try to run them as:

 sbt test-only com.company.pipeline.*

 we start to run into issues. It appears that the issue is that the hive
 context is not properly shutdown after finishing the first test. Does any
 one know how to attack this problem? The test part in my build.sbt file
 looks like:

 libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 % test,
 parallelExecution in Test := false,
 fork := true,
 javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M,
 -XX:+CMSClassUnloadingEnabled)

 We are working under Spark 1.3.0


 Thanks
 --
 Cesar Flores



Re: how to ignore MatchError then processing a large json file in spark-sql

2015-08-03 Thread Michael Armbrust
This sounds like a bug.  What version of spark? and can you provide the
stack trace?

On Sun, Aug 2, 2015 at 11:27 AM, fuellee lee lifuyu198...@gmail.com wrote:

 I'm trying to process a bunch of large json log files with spark, but it
 fails every time with `scala.MatchError`, Whether I give it schema or not.

 I just want to skip lines that does not match schema, but I can't find how
 in docs of spark.

 I know write a json parser and map it to json file RDD can get things
 done, but I want to use
 `sqlContext.read.schema(schema).json(fileNames).selectExpr(...)` because
 it's much easier to maintain.

 thanks



Re: how to convert a sequence of TimeStamp to a dataframe

2015-08-03 Thread Michael Armbrust
In general it needs to be a Seq of Tuples for the implicit toDF to work
(which is a little tricky when there is only one column).

scala Seq(Tuple1(new
java.sql.Timestamp(System.currentTimeMillis))).toDF(a)
res3: org.apache.spark.sql.DataFrame = [a: timestamp]

or with multiple columns

scala Seq((1, new
java.sql.Timestamp(System.currentTimeMillis))).toDF(a, b)
res4: org.apache.spark.sql.DataFrame = [a: string, b: timestamp]

On Fri, Jul 31, 2015 at 2:50 PM, Joanne Contact joannenetw...@gmail.com
wrote:

 Hi Guys,

 I have struggled for a while on this seeming simple thing:

 I have a sequence of timestamps and want to create a dataframe with 1
 column.

 Seq[java.sql.Timestamp]

 //import collection.breakOut

 var seqTimestamp = scala.collection.Seq(listTs:_*)

 seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0,
 2015-07-22 16:53:00.0, ., )

 I tried a lot of ways to create a dataframe and below is another failed
 way:

 import sqlContext.implicits._
 var rddTs = sc.parallelize(seqTimestamp)
 rddTs.toDF(minInterval)

 console:108: error: value toDF is not a member of
 org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval)

 So, any guru could please tell me how to do this

 I am not familiar with Scala or Spark. I wonder if learning Scala will
 help this at all? It just sounds a lot of time of trial/error and
 googling.

 docs like

 https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html

 https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq
 ,
 scala.reflect.api.TypeTags.TypeTag)
 does not help.

 Btw, I am using Spark 1.4.

 Thanks in advance,

 J

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
Perhaps I'm missing what you are trying to accomplish, but if you'd like to
avoid the null values do an inner join instead of an outer join.

Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(
y).isNotNull).show still contains null values in the column y. This
doesn't really have anything to do with nullable, which is only a hint to
the system so that we can avoid null checking when we know that there are
no null values. If you provide the full code i can try and see if this is a
bug.

On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com
wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an *left outer join* on records and mappings (with the 
 ON JOIN criterion on columns (recordDF(x) === mappingDF(x)  shorthand 
 is in *leftOuterJoinWithRemovalOfEqualColumn*

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is

 +-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of

 +-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
 to nullable=true described in my first post.

 *Thus, I need this schema modification as to make outer joins work.*

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according
 to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: [Parquet + Dataframes] Column names with spaces

2015-07-30 Thread Michael Armbrust
You can't use these names due to limitations in parquet (and the library it
self with silently generate corrupt files that can't be read, hence the
error we throw).

You can alias a column by df.select(df(old).alias(new)), which is
essential what withColumnRenamed does.  Alias in this case means renaming.

On Thu, Jul 30, 2015 at 11:49 AM, angelini alex.angel...@shopify.com
wrote:

 Hi all,

 Our data has lots of human readable column names (names that include
 spaces), is it possible to use these with Parquet and Dataframes?

 When I try and write the Dataframe I get the following error:

 (I am using PySpark)

 `AnalysisException: Attribute name Name with Space contains invalid
 character(s) among  ,;{}()\n\t=. Please use alias to rename it.`

 How can I alias that column name?

 `df['Name with Space'] = df['Name with Space'].alias('Name')` doesn't work
 as you can't assign to a dataframe column.

 `df.withColumnRenamed('Name with Space', 'Name')` overwrites the column and
 doesn't alias it.

 Any ideas?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-Dataframes-Column-names-with-spaces-tp24088.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com
wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to read a Json file with a specific format?

2015-07-29 Thread Michael Armbrust
This isn't totally correct.  Spark SQL does support JSON arrays and will
implicitly flatten them.  However, complete objects or arrays must exist
one per line and cannot be split with newlines.

On Wed, Jul 29, 2015 at 7:55 AM, Young, Matthew T matthew.t.yo...@intel.com
 wrote:

 The built-in Spark JSON functionality cannot read normal JSON arrays. The
 format it expects is a bunch of individual JSON objects without any outer
 array syntax, with one complete JSON object per line of the input file.

 AFAIK your options are to read the JSON in the driver and parallelize it
 out to the workers or to fix your input file to match the spec.

 For one-off conversions I usually use a combination of jq and
 regex-replaces to get the source file in the right format.

 
 From: SparknewUser [melanie.galloi...@gmail.com]
 Sent: Wednesday, July 29, 2015 6:37 AM
 To: user@spark.apache.org
 Subject: How to read a Json file with a specific format?

 I'm trying to read a Json file which is like :
 [

 {IFAM:EQR,KTM:143000640,COL:21,DATA:[{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ]}

 ,{IFAM:EQR,KTM:143000640,COL:22,DATA:[{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ,{MLrate:30,Nrout:0,up:null,Crate:2}
 ]}
 ]

 I've tried the command:
 val df = sqlContext.read.json(namefile)
 df.show()


 But this does not work : my columns are not recognized...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-Json-file-with-a-specific-format-tp24061.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame DAG recomputed even though DataFrame is cached?

2015-07-28 Thread Michael Armbrust
We will try to address this before Spark 1.5 is released:
https://issues.apache.org/jira/browse/SPARK-9141

On Tue, Jul 28, 2015 at 11:50 AM, Kristina Rogale Plazonic kpl...@gmail.com
 wrote:

 Hi,

 I'm puzzling over the following problem: when I cache a small sample of a
 big dataframe, the small dataframe is recomputed when selecting a column
 (but not if show() or count() is invoked).

 Why is that so and how can I avoid recomputation of the small sample
 dataframe?

 More details:

 - I have a big dataframe df of ~190million rows and ~10 columns,
 obtained via 3 different joins; I cache it and invoke count() to make sure
 it really is in memory and confirm in web UI

 - val sdf = df.sample(false, 1e-6); sdf.cache(); sdf.count()  // 170
 lines; cached is also confirmed in webUI, size in memory is 150kB

 *- sdf.select(colname).show()   // this triggers a complete
 recomputation of sdf with 3 joins!*

 - show(), count() or take() do not trigger the recomputation of the 3
 joins, but select(), collect() or withColumn() do.

 I have --executor-memory 30G --driver-memory 10g, so memory is not a
 problem. I'm using Spark 1.4.0. Could anybody shed some light on this or
 where I can find more info?

 Many thanks,
 Kristina



Re: GenericRowWithSchema is too heavy

2015-07-27 Thread Michael Armbrust
Internally I believe that we only actually create one struct object for
each row, so you are really only paying the cost of the pointer in most use
cases (as shown below).

scala val df = Seq((1,2), (3,4)).toDF(a, b)
df: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala df.collect()
res1: Array[org.apache.spark.sql.Row] = Array([1,2], [3,4])

scala res1(0).schema eq res1(1).schema
res3: Boolean = true

I'd strongly suggest that you use something like parquet
https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
or avro http://spark-packages.org/package/databricks/spark-avro to store
DataFrames as it is likely much more space efficient and faster than
generic serialization.

Michael

On Mon, Jul 27, 2015 at 9:02 PM, Kevin Jung itsjb.j...@samsung.com wrote:

 Hi all,

 SparkSQL usually creates DataFrame with GenericRowWithSchema(is that
 right?). And 'Row' is a super class of GenericRow and GenericRowWithSchema.
 The only difference is that GenericRowWithSchema has its schema information
 as StructType. But I think one DataFrame has only one schema then each row
 should not have to store schema in it. Because StructType is very heavy and
 most of RDD has many rows. To test this,
 1) create DataFrame and call rdd ( RDD[Row] ) = GenericRowWithSchema
 2) dataframe.map( row = Row(row.toSeq)) = GenericRow
 3) dataframe.map( row = row.toSeq) = underlying sequence of a row
 4) saveAsObjectFile or use org.apache.spark.util.SizeEstimator.estimate
 And my result is,
 (dataframe with 5columns)
 GenericRowWithSchema = 13gb
 GenericRow = 8.2gb
 Seq = 7gb

 Best regards
 Kevin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/GenericRowWithSchema-is-too-heavy-tp24018.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Issue with column named count in a DataFrame

2015-07-22 Thread Michael Armbrust
Additionally have you tried enclosing count in `backticks`?

On Wed, Jul 22, 2015 at 4:25 PM, Michael Armbrust mich...@databricks.com
wrote:

 I believe this will be fixed in Spark 1.5

 https://github.com/apache/spark/pull/7237

 On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T 
 matthew.t.yo...@intel.com wrote:

 I'm trying to do some simple counting and aggregation in an IPython
 notebook with Spark 1.4.0 and I have encountered behavior that looks like a
 bug.

 When I try to filter rows out of an RDD with a column name of count I get
 a large error message. I would just avoid naming things count, except for
 the fact that this is the default column name created with the count()
 operation in pyspark.sql.GroupedData

 The small example program below demonstrates the issue.

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
 dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title])
 counts = dataFrame.groupBy('title').count()
 counts.filter(title = 'foo').show() # Works
 counts.filter(count  1).show() # Errors out


 I can even reproduce the issue in a PySpark shell session by entering
 these commands.

 I suspect that the error has something to with Spark wanting to call the
 count() function in place of looking at the count column.

 The error message is as follows:


 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-29-62a1b7c71f21 in module()
  1 counts.filter(count  1).show() # Errors Out

 C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
 774 
 775 if isinstance(condition, basestring):
 -- 776 jdf = self._jdf.filter(condition)
 777 elif isinstance(condition, Column):
 778 jdf = self._jdf.filter(condition._jc)

 C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
 *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)

 539
 540 for temp_arg in temp_args:

 C:\Python27\lib\site-packages\py4j\protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o229.filter.
 : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found

 count  1
   ^
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
 at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
 at java.lang.reflect.Method.invoke(Unknown Source)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Unknown Source)



 Is there a recommended workaround to the inability to filter on a column
 named count? Do I have to make a new DataFrame and rename the column just
 to work around this bug? What's the best way to do that?

 Thanks,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Issue with column named count in a DataFrame

2015-07-22 Thread Michael Armbrust
I believe this will be fixed in Spark 1.5

https://github.com/apache/spark/pull/7237

On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T matthew.t.yo...@intel.com
 wrote:

 I'm trying to do some simple counting and aggregation in an IPython
 notebook with Spark 1.4.0 and I have encountered behavior that looks like a
 bug.

 When I try to filter rows out of an RDD with a column name of count I get
 a large error message. I would just avoid naming things count, except for
 the fact that this is the default column name created with the count()
 operation in pyspark.sql.GroupedData

 The small example program below demonstrates the issue.

 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc)
 dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title])
 counts = dataFrame.groupBy('title').count()
 counts.filter(title = 'foo').show() # Works
 counts.filter(count  1).show() # Errors out


 I can even reproduce the issue in a PySpark shell session by entering
 these commands.

 I suspect that the error has something to with Spark wanting to call the
 count() function in place of looking at the count column.

 The error message is as follows:


 Py4JJavaError Traceback (most recent call last)
 ipython-input-29-62a1b7c71f21 in module()
  1 counts.filter(count  1).show() # Errors Out

 C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
 774 
 775 if isinstance(condition, basestring):
 -- 776 jdf = self._jdf.filter(condition)
 777 elif isinstance(condition, Column):
 778 jdf = self._jdf.filter(condition._jc)

 C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
 *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:

 C:\Python27\lib\site-packages\py4j\protocol.pyc in
 get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling o229.filter.
 : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found

 count  1
   ^
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
 at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
 at java.lang.reflect.Method.invoke(Unknown Source)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Unknown Source)



 Is there a recommended workaround to the inability to filter on a column
 named count? Do I have to make a new DataFrame and rename the column just
 to work around this bug? What's the best way to do that?

 Thanks,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Add column to DF

2015-07-21 Thread Michael Armbrust
Try instead:

import org.apache.spark.sql.functions._

val determineDayPartID = udf((evntStDate: String, evntStHour: String) = {
val stFormat = new java.text.SimpleDateFormat(yyMMdd)
var stDateStr:String = evntStDate.substring(2,8)
val stDate:Date = stFormat.parse(stDateStr)
val stHour = evntStHour.substring(1,3).toDouble + 0.1
var bucket = Math.ceil(stHour/3.0).toInt
val cal:Calendar = Calendar.getInstance
cal.setTime(stDate)
var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK)
if (dayOfWeek == 1) dayOfWeek = 8
if (dayOfWeek  6) bucket = bucket + 8
   bucket

  })

input.withColumn(DayPartID, determineDayPartID (col(StartDate),
col(EventStartHour)))


Re: Question on Spark SQL for a directory

2015-07-21 Thread Michael Armbrust
https://spark.apache.org/docs/latest/sql-programming-guide.html#loading-data-programmatically

On Tue, Jul 21, 2015 at 4:06 PM, Ron Gonzalez zlgonza...@yahoo.com.invalid
wrote:

 Hi,
   Question on using spark sql.
   Can someone give an example for creating table from a directory
 containing parquet files in HDFS instead of an actual parquet file?

 Thanks,
 Ron

 On 07/21/2015 01:59 PM, Brandon White wrote:

 A few questions about caching a table in Spark SQL.

 1) Is there any difference between caching the dataframe and the table?

 df.cache() vs sqlContext.cacheTable(tableName)

 2) Do you need to warm up the cache before seeing the performance
 benefits? Is the cache LRU? Do you need to run some queries on the table
 before it is cached in memory?

 3) Is caching the table much faster than .saveAsTable? I am only seeing a
 10 %- 20% performance increase.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: dataframes sql order by not total ordering

2015-07-20 Thread Michael Armbrust
An ORDER BY needs to be on the outermost query otherwise subsequent
operations (such as the join) could reorder the tuples.

On Mon, Jul 20, 2015 at 9:25 AM, Carol McDonald cmcdon...@maprtech.com
wrote:

 the following query on the Movielens dataset , is sorting by the count of
 ratings for a movie.  It looks like the results  are ordered  by partition
 ?
 scala val results =sqlContext.sql(select movies.title, movierates.maxr,
 movierates.minr, movierates.cntu from(SELECT ratings.product,
 max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct
 user) as cntu FROM ratings group by ratings.product order by cntu desc)
 movierates join movies on movierates.product=movies.movieId )

 scala results.take(30).foreach(println)
 [Right Stuff, The (1983),5.0,1.0,750]
 [Lost in Space (1998),5.0,1.0,667]
 [Dumb  Dumber (1994),5.0,1.0,660]
 [Patch Adams (1998),5.0,1.0,474]
 [Carlito's Way (1993),5.0,1.0,369]
 [Rounders (1998),5.0,1.0,345]
 [Bedknobs and Broomsticks (1971),5.0,1.0,319]
 [Beverly Hills Ninja (1997),5.0,1.0,232]
 [Saving Grace (2000),5.0,1.0,186]
 [Dangerous Minds (1995),5.0,1.0,141]
 [Death Wish II (1982),5.0,1.0,85]
 [All Dogs Go to Heaven 2 (1996),5.0,1.0,75]
 [Repossessed (1990),4.0,1.0,53]
 [Assignment, The (1997),5.0,1.0,49]
 [$1,000,000 Duck (1971),5.0,1.0,37]
 [Stonewall (1995),5.0,1.0,20]
 [Dog of Flanders, A (1999),5.0,1.0,8]
 [Frogs for Snakes (1998),3.0,1.0,5]
 [It's in the Water (1998),3.0,2.0,3]
 [Twelve Monkeys (1995),5.0,1.0,1511]
 [Ransom (1996),5.0,1.0,564]
 [Alice in Wonderland (1951),5.0,1.0,525]
 [City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392]
 [Eat Drink Man Woman (1994),5.0,1.0,346]
 [Cube (1997),5.0,1.0,233]
 [Omega Man, The (1971),5.0,1.0,224]
 [Stepmom (1998),5.0,1.0,146]
 [Metro (1997),5.0,1.0,100]
 [Death Wish 3 (1985),5.0,1.0,72]
 [Stalker (1979),5.0,1.0,52]



Re: Spark 1.3.1 + Hive: write output to CSV with header on S3

2015-07-17 Thread Michael Armbrust
Using a hive-site.xml file on the classpath.

On Fri, Jul 17, 2015 at 8:37 AM, spark user spark_u...@yahoo.com.invalid
wrote:

 Hi Roberto

 I have question regarding HiveContext .

 when you create HiveContext where you define Hive connection properties ?
 Suppose Hive is not in local machine i need to connect , how HiveConext
 will know the data base info like url ,username and password ?

 String  username = ;
 String  password = ;

 String url = jdbc:hive2://quickstart.cloudera:1/default;



   On Friday, July 17, 2015 2:29 AM, Roberto Coluccio 
 roberto.coluc...@gmail.com wrote:


 Hello community,

 I'm currently using Spark 1.3.1 with Hive support for outputting processed
 data on an external Hive table backed on S3. I'm using a manual
 specification of the delimiter, but I'd want to know if is there any
 clean way to write in CSV format:

 *val* sparkConf = *new* SparkConf()
 *val* sc = *new* SparkContext(sparkConf)
 *val* hiveContext = *new* org.apache.spark.sql.hive.HiveContext(sc)
 *import* hiveContext.implicits._
 hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1
 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 LOCATION ' + path_on_s3 + ')
 hiveContext.sql(an INSERT OVERWRITE query to write into the above table)

 I also need the header of the table to be printed on each written file. I
 tried with:

 hiveContext.sql(set hive.cli.print.header=true)

 But it didn't work.

 Any hint?

 Thank you.

 Best regards,
 Roberto






Re: MapType vs StructType

2015-07-17 Thread Michael Armbrust
I'll add there is a JIRA to override the default past some threshold of #
of unique keys: https://issues.apache.org/jira/browse/SPARK-4476
https://issues.apache.org/jira/browse/SPARK-4476

On Fri, Jul 17, 2015 at 1:32 PM, Michael Armbrust mich...@databricks.com
wrote:

 The difference between a map and a struct here is that in a struct all
 possible keys are defined as part of the schema and can each can have a
 different type (and we don't support union types).  JSON doesn't have
 differentiated data structures so we go with the one that gives you more
 information when doing inference by default.  If you pass in a schema to
 JSON however, you can override this and have a JSON object parsed as a map.

 On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote:

 I notice JSON objects are all parsed as Map[String,Any] in Jackson but
 for some reason, the inferSchema tools in Spark SQL extracts the schema
 of nested JSON objects as StructTypes.

 This makes it really confusing when trying to rectify the object
 hierarchy when I have maps because the Catalyst conversion layer underneath
 is expecting a Row or Product and not a Map.

 Why wasn't MapType used here? Is there any significant difference between
 the two of these types that would cause me not to use a MapType when I'm
 constructing my own schema representing a set of nested Map[String,_]'s?







Re: MapType vs StructType

2015-07-17 Thread Michael Armbrust
The difference between a map and a struct here is that in a struct all
possible keys are defined as part of the schema and can each can have a
different type (and we don't support union types).  JSON doesn't have
differentiated data structures so we go with the one that gives you more
information when doing inference by default.  If you pass in a schema to
JSON however, you can override this and have a JSON object parsed as a map.

On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote:

 I notice JSON objects are all parsed as Map[String,Any] in Jackson but for
 some reason, the inferSchema tools in Spark SQL extracts the schema of
 nested JSON objects as StructTypes.

 This makes it really confusing when trying to rectify the object hierarchy
 when I have maps because the Catalyst conversion layer underneath is
 expecting a Row or Product and not a Map.

 Why wasn't MapType used here? Is there any significant difference between
 the two of these types that would cause me not to use a MapType when I'm
 constructing my own schema representing a set of nested Map[String,_]'s?






Re: Data frames select and where clause dependency

2015-07-17 Thread Michael Armbrust
Each operation on a dataframe is completely independent and doesn't know
what operations happened before it.  When you do a selection, you are
removing other columns from the dataframe and so the filter has nothing to
operate on.

On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com
wrote:

 I'd like to understand why the where field must exist in the select
 clause.

 For example, the following select statement works fine

- df.select(field1, filter_field).filter(df(filter_field) ===
value).show()

 However, the next one fails with the error in operator !Filter
 (filter_field#60 = value);

- df.select(field1).filter(df(filter_field) === value).show()

 As a work-around, it seems that I can do the following

- df.select(field1, filter_field).filter(df(filter_field) ===
value).drop(filter_field).show()


 Thanks, Mike.



Re: PairRDDFunctions and DataFrames

2015-07-16 Thread Michael Armbrust
Instead of using that RDD operation just use the native DataFrame function
approxCountDistinct

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

On Thu, Jul 16, 2015 at 6:58 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Hi, could someone point me to the recommended way of using
 countApproxDistinctByKey with DataFrames?

 I know I can map to pair RDD but I'm wondering if there is a simpler
 method? If someone knows if this operations is expressible in SQL that
 information would be most appreciated as well.



Re: Basic Spark SQL question

2015-07-13 Thread Michael Armbrust
I'd look at the JDBC server (a long running yarn job you can submit queries
too)

https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server

On Mon, Jul 13, 2015 at 6:31 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Well for adhoc queries you can use the CLI

 On Mon, Jul 13, 2015 at 5:34 PM, Ron Gonzalez 
 zlgonza...@yahoo.com.invalid wrote:

 Hi,
   I have a question for Spark SQL. Is there a way to be able to use Spark
 SQL on YARN without having to submit a job?
   Bottom line here is I want to be able to reduce the latency of running
 queries as a job. I know that the spark sql default submission is like a
 job, but was wondering if it's possible to run queries like one would with
 a regular db like MySQL or Oracle.

 Thanks,
 Ron


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-10 Thread Michael Armbrust
Metastore configuration should be set in hive-site.xml.

On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi,

 I am trying to set the hive metadata destination to a mysql database in
 hive context, it works fine in spark 1.3.1, but it seems broken in spark
 1.4.1-rc1, where it always connect to the default metadata: local), is this
 a regression or we must set the connection in hive-site.xml?

 The code is very simple in spark shell:
* import org.apache.spark.sql.hive._*
 *val hiveContext = new HiveContext(sc)*
 *hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
 com.mysql.jdbc.Driver)*
 *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionURL,
 jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
 *hiveContext.setConf(hive.metastore.warehouse.dir,
 /user/hive/warehouse)*
 *hiveContext.sql(select * from mysqltable).show()*

 *Thanks!*
 *-Terry*




Re: [SPARK-SQL] libgplcompression.so already loaded in another classloader

2015-07-08 Thread Michael Armbrust
Here's a related JIRA: https://issues.apache.org/jira/browse/SPARK-7819
https://issues.apache.org/jira/browse/SPARK-7819

Typically you can work around this by making sure that the classes are
shared across the isolation boundary, as discussed in the comments.

On Tue, Jul 7, 2015 at 3:29 AM, Sea 261810...@qq.com wrote:

 Hi, all
 I found an Exception when using spark-sql
 java.lang.UnsatisfiedLinkError: Native Library
 /data/lib/native/libgplcompression.so already loaded in another classloader
 ...

 I set  spark.sql.hive.metastore.jars=.  in file spark-defaults.conf

 It does not happen every time. Who knows why?

 Spark version: 1.4.0
 Hadoop version: 2.2.0




Re: DataFrame question

2015-07-07 Thread Michael Armbrust
You probably want to explode the array to produce one row per element:

df.select(explode(df(links)).alias(link))

On Tue, Jul 7, 2015 at 10:29 AM, Naveen Madhire vmadh...@umail.iu.edu
wrote:

 Hi All,

 I am working with dataframes and have been struggling with this thing, any
 pointers would be helpful.

 I've a Json file with the schema like this,

 links: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- desc: string (nullable = true)
  |||-- id: string (nullable = true)


 I want to fetch id and desc as an RDD like this RDD[(String,String)]

 i am using dataframes*df.select(links.desc,links.id
 http://links.id/).rdd*

 the above dataframe is returning an RDD like this
 RDD[(List(String),List(String)]


 So, links:[{one,1},{two,2},{three,3}] json should return and
 RDD[(one,1),(two,2),(three,3)]

 can anyone tell me how the dataframe select should be modified?



Re: Converting spark JDBCRDD to DataFrame

2015-07-06 Thread Michael Armbrust
Use the built in JDBC data source:

https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

On Mon, Jul 6, 2015 at 6:42 AM, Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi all!

 what is the most efficient way to convert jdbcRDD to DataFrame.

 any example?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Converting-spark-JDBCRDD-to-DataFrame-tp23647.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Dataframe 1.4 (GroupBy partial match)

2015-07-01 Thread Michael Armbrust
You should probably write a UDF that uses regular expression or other
string munging to canonicalize the subject and then group on that derived
column.

On Tue, Jun 30, 2015 at 10:30 PM, Suraj Shetiya surajshet...@gmail.com
wrote:

 Thanks Salih. :)


 The output of the groupby is as below.

 2015-01-14  SEC Inquiry
 2015-01-16   Re: SEC Inquiry
 2015-01-18   Fwd: Re: SEC Inquiry


 And subsequently, we would like to aggregate all messages with a
 particular reference subject.
 For instance the question we are trying to answer could be : Get the count
 of messages with a particular subject.

 Looking forward to any suggestion from you.


 On Tue, Jun 30, 2015 at 8:42 PM, Salih Oztop soz...@yahoo.com wrote:

 Hi Suraj
 What will be your output after group by? Since GroupBy is for
 aggregations like sum, count etc.
 If you want to count the 2015 records than it is possible.

 Kind Regards
 Salih Oztop


   --
  *From:* Suraj Shetiya surajshet...@gmail.com
 *To:* user@spark.apache.org
 *Sent:* Tuesday, June 30, 2015 3:05 PM
 *Subject:* Spark Dataframe 1.4 (GroupBy partial match)

 I have a dataset (trimmed and simplified) with 2 columns as below.

 DateSubject
 2015-01-14  SEC Inquiry
 2014-02-12   Happy birthday
 2014-02-13   Re: Happy birthday
 2015-01-16   Re: SEC Inquiry
 2015-01-18   Fwd: Re: SEC Inquiry

 I have imported the same in a Spark Dataframe. What I am looking at is
 groupBy subject field (however, I need a partial match to identify the
 discussion topic).

 For example in the above case.. I would like to group all messages, which
 have subject containing SEC Inquiry which returns following grouped
 frame:

 2015-01-14  SEC Inquiry
 2015-01-16   Re: SEC Inquiry
 2015-01-18   Fwd: Re: SEC Inquiry

 Another usecase for a similar problem could be group by year (in the
 above example), it would mean partial match of the date field, which would
 mean groupBy Date by matching year as 2014 or 2015.

 Keenly Looking forward to reply/solution to the above.

 - Suraj








Re: Issue with parquet write after join (Spark 1.4.0)

2015-07-01 Thread Michael Armbrust
I would still look at your executor logs.  A count() is rewritten by the
optimizer to be much more efficient because you don't actually need any of
the columns.  Also, writing parquet allocates quite a few large buffers.

On Wed, Jul 1, 2015 at 5:42 AM, Pooja Jain pooja.ja...@gmail.com wrote:

 Join is happening successfully as I am able to do count() after the join.

 Error is coming only while trying to write in parquet format on hdfs.

 Thanks,
 Pooja.

 On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It says:

 Caused by: java.net.ConnectException: Connection refused: slave2/...:54845

 Could you look in the executor logs (stderr on slave2) and see what made
 it shut down? Since you are doing a join there's a high possibility of OOM
 etc.


 Thanks
 Best Regards

 On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com
 wrote:

 Hi,

 We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
 spark-submit. We are facing parquet write issue after doing dataframe joins

 We have a full data set and then an incremental data. We are reading
 them as dataframes, joining them, and then writing the data to the hdfs
 system in parquet format. We are getting the timeout error on the last
 partition.

 But if we do a count on the joined data it is working - which gives us
 the confidence that join is happening properly. Only in case of writing to
 the hdfs it is timing out.

 Code flow:

 // join two data frames - dfBase and dfIncr on primaryKey
 val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === 
 dfIncr(primaryKey), outer)

 // applying a reduce function on each row.
 val mergedDF = joinedDF.map(x =
   reduceFunc(x)
 )

 //converting back to dataframe
 val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

 //writing to parquet file
 newdf.write.parquet(hdfsfilepath)

 Getting following exception:

 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with 
 no recent heartbeats: 255766 ms exceeds timeout 24 ms
 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: Executor heartbeat timed out after 255766 ms
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 
 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 
 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to 
 kill executor(s) 26
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block 
 manager BlockManagerId(26, slave2, 54845)
 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number 
 of 26 executor(s).
 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now 
 unavailable on executor 26 (193/200, false)
 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested 
 to kill executor(s) 26.
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: remote Rpc client disassociated
 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with 
 remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address 
 is now gated for [5000] ms. Reason is: [Disassociated].
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 
 (TID 310, slave2): org.apache.spark.SparkException: Task failed while 
 writing rows.
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at 

Re: Custom order by in Spark SQL

2015-07-01 Thread Michael Armbrust
Easiest way to do this today is to define a UDF that maps from string to a
number.

On Wed, Jul 1, 2015 at 10:25 AM, Mick Davies michael.belldav...@gmail.com
wrote:

 Hi,

 Is there a way to specify a custom order by (Ordering) on a column in Spark
 SQL

 In particular I would like to have the order by applied to a currency
 column
 not to be alpha, but something like -  USD, EUR,  JPY, GBP etc..

 I saw an earlier post on UDTs and ordering (which I can't seem to find in
 this archive,

 http://mail-archives.us.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAFGcCdWWCFCwVp7+BCaPQ=6uupmyjcbhqyjn9txeu45hjg4...@mail.gmail.com%3E
 ),
 which is somewhat related to this question.

 Thanks
 Mick



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-order-by-in-Spark-SQL-tp23569.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Check for null in PySpark DataFrame

2015-07-01 Thread Michael Armbrust
There is an isNotNull function on any column.

df._1.isNotNull

or

from pyspark.sql.functions import *
col(myColumn).isNotNull

On Wed, Jul 1, 2015 at 3:07 AM, Olivier Girardot ssab...@gmail.com wrote:

 I must admit I've been using the same back to SQL strategy for now :p
 So I'd be glad to have insights into that too.

 Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit :

 I am trying to find what is the correct way to programmatically check for
 null values for rows in a dataframe. For example, below is the code using
 pyspark and sql:

 df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
 b), (4, None)]))
 df.where('_2 is not null').count()

 However, this won't work
 df.where(df._2 != None).count()

 It seems there is no native Python way with DataFrames to do this, but I
 find that difficult to believe and more likely that I am missing the
 right
 way to do this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: BroadcastHashJoin when RDD is not cached

2015-07-01 Thread Michael Armbrust
We don't know that the table is small unless you cache it.  In Spark 1.5
you'll be able to give us a hint though (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
)

On Wed, Jul 1, 2015 at 8:30 AM, Srikanth srikanth...@gmail.com wrote:

 Hello,



 I have a straight forward use case of joining a large table with a smaller
 table. The small table is within the limit I set for
 spark.sql.autoBroadcastJoinThreshold.

 I notice that ShuffledHashJoin is used to perform the join.
 BroadcastHashJoin was used only when I pre-fetched and cached the small
 table.

 I understand that for typical broadcast we would have to read and
 collect() the small table in driver before broadcasting.

 Why not do this automatically for joins? That way stage1(read large table)
 and stage2(read small table) can still be run in parallel.





 Sort [emailId#19 ASC,date#0 ASC], true

  Exchange (RangePartitioning 24)

   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

Filter ((lowerTime#22 = date#0)  (date#0 = upperTime#23))

 *ShuffledHashJoin* [ip#7], [ip#18], BuildRight

  Exchange (HashPartitioning 24)

   Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

PhysicalRDD
 [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
 MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25

  Exchange (HashPartitioning 24)

   Project [emailId#19,scalaUDF(date#20) AS
 upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]

PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
 rddToDataFrameHolder at DataSourceReader.scala:41


 Srikanth



Re: Subsecond queries possible?

2015-06-30 Thread Michael Armbrust

 This brings up another question/issue - there doesn't seem to be a way to
 partition cached tables in the same way you can partition, say a Hive
 table.  For example, we would like to partition the overall dataset (233m
 rows, 9.2Gb) by (product, coupon) so when we run one of these queries
 Spark won't have to scan all the data, just the partition from the query,
 eg, (FNM30, 3.0).


If you order the data on the interesting column before caching, we keep
min/max statistics that let us do similar data skipping automatically.


Re: sql dataframe internal representation

2015-06-25 Thread Michael Armbrust
In many cases we use more efficient mutable implementations internally
(i.e. mutable undecoded utf8 instead of java.lang.String, or a BigDecimal
implementation that uses a Long when the number is small enough).

On Thu, Jun 25, 2015 at 1:56 PM, Koert Kuipers ko...@tresata.com wrote:

 i noticed in DataFrame that to get the rdd out of it some conversions are
 done:
   val converter = CatalystTypeConverters.createToScalaConverter(schema)
   rows.map(converter(_).asInstanceOf[Row])

 does this mean DataFrame internally does not use the standard scala types?
 why not?



Re: [sparksql] sparse floating point data compression in sparksql cache

2015-06-24 Thread Michael Armbrust
Have you considered instead using the mllib SparseVector type (which is
supported in Spark SQL?)

On Wed, Jun 24, 2015 at 1:31 PM, Nikita Dolgov n...@beckon.com wrote:

 When my 22M Parquet test file ended up taking 3G when cached in-memory I
 looked closer at how column compression works in 1.4.0. My test dataset was
 1,000 columns * 800,000 rows of mostly empty floating point columns with a
 few dense long columns.

 I was surprised to see that no real
 org.apache.spark.sql.columnar.compression.CompressionScheme supports
 floating-point types and so conversion falls back to the no-op
 PassThrough implementation. In addition, the way
 org.apache.spark.sql.columnar.NullableColumnBuilder encodes null values
 (with four bytes for each of them) seems to be heavily biased against
 sparse data.

 It would be interesting to know if sparse floating point datasets were
 neglected for a reason other than some obscure historical accident. Is
 there anything in the Tungsten roadmap which would allow, for example,
 https://drill.apache.org/docs/value-vectors/ -style efficiency for this
 kind of data?



Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-06-24 Thread Michael Armbrust
Starting in Spark 1.4 there is also an explode that you can use directly
from the select clause (much like in HiveQL):

import org.apache.spark.sql.functions._
df.select(explode($entities.user_mentions).as(mention))

Unlike standard HiveQL, you can also include other attributes in the select
or even $*.


On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai yh...@databricks.com wrote:

 The function accepted by explode is f: Row = TraversableOnce[A]. Seems
 user_mentions is an array of structs. So, can you change your
 pattern matching to the following?

 case Row(rows: Seq[_]) = rows.asInstanceOf[Seq[Row]].map(elem = ...)

 On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones 
 garjo...@socialmetrix.com wrote:

 Hi All,

 I am using the new *Apache Spark version 1.4.0 Data-frames API* to
 extract information from Twitter's Status JSON, mostly focused on the 
 Entities
 Object https://dev.twitter.com/overview/api/entities - the relevant
 part to this question is showed below:

 {
   ...
   ...
   entities: {
 hashtags: [],
 trends: [],
 urls: [],
 user_mentions: [
   {
 screen_name: linobocchini,
 name: Lino Bocchini,
 id: 187356243,
 id_str: 187356243,
 indices: [ 3, 16 ]
   },
   {
 screen_name: jeanwyllys_real,
 name: Jean Wyllys,
 id: 23176,
 id_str: 23176,
 indices: [ 79, 95 ]
   }
 ],
 symbols: []
   },
   ...
   ...
 }

 There are several examples on how extract information from primitives
 types as string, integer, etc - but I couldn't find anything on how to
 process those kind of *complex* structures.

 I tried the code below but it is still doesn't work, it throws an
 Exception

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 val tweets = sqlContext.read.json(tweets.json)

 // this function is just to filter empty entities.user_mentions[] nodes
 // some tweets doesn't contains any mentions
 import org.apache.spark.sql.functions.udf
 val isEmpty = udf((value: List[Any]) = value.isEmpty)

 import org.apache.spark.sql._
 import sqlContext.implicits._
 case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
 String, screenName: String)

 val mentions = tweets.select(entities.user_mentions).
   filter(!isEmpty($user_mentions)).
   explode($user_mentions) {
   case Row(arr: Array[Row]) = arr.map { elem =
 UserMention(
   elem.getAs[Long](id),
   elem.getAs[String](is_str),
   elem.getAs[Array[Long]](indices),
   elem.getAs[String](name),
   elem.getAs[String](screen_name))
   }
 }

 mentions.first

 Exception when I try to call mentions.first:

 scala mentions.first
 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
 scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
 Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
 Wyllys,jeanwyllys_real])] (of class 
 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
 at 
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at 
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
 at 
 org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)

 What is wrong here? I understand it is related to the types but I
 couldn't figure out it yet.

 As additional context, the structure mapped automatically is:

 scala mentions.printSchema
 root
  |-- user_mentions: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- id: long (nullable = true)
  |||-- id_str: string (nullable = true)
  |||-- indices: array (nullable = true)
  ||||-- element: long (containsNull = true)
  |||-- name: string (nullable = true)
  |||-- screen_name: string (nullable = true)

 *NOTE 1:* I know it is possible to solve this using HiveQL but I would
 like to use Data-frames once there is so much momentum around it.

 SELECT explode(entities.user_mentions) as mentions
 FROM tweets

 *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =
 value.isEmpty) is a ugly hack and I'm missing something here, but was
 the only way I came up to avoid a NPE

 I’ve posted the same question on SO:
 http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames

 Thanks all!
 - gustavo





Re: Nested DataFrame(SchemaRDD)

2015-06-23 Thread Michael Armbrust
You can also do this using a sequence of case classes (in the example
stored in a tuple, though the outer container could also be a case class):

case class MyRecord(name: String, location: String)
val df = Seq((1, Seq(MyRecord(Michael, Berkeley), MyRecord(Andy,
Oakland.toDF(id, people)

df.printSchema

root
|-- id: integer (nullable = false)
|-- people: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- location: string (nullable = true)

If this dataframe is saved to parquet the nesting will be preserved.

On Tue, Jun 23, 2015 at 4:35 PM, Roberto Congiu roberto.con...@gmail.com
wrote:

 I wrote a brief howto on building nested records in spark and storing them
 in parquet here:
 http://www.congiu.com/creating-nested-data-parquet-in-spark-sql/

 2015-06-23 16:12 GMT-07:00 Richard Catlin richard.m.cat...@gmail.com:

 How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a
 column?  Is there an example?  Will this store as a nested parquet file?

 Thanks.

 Richard Catlin





Re: SparkSQL: leftOuterJoin is VERY slow!

2015-06-19 Thread Michael Armbrust
Broadcast outer joins are on my short list for 1.5.

On Fri, Jun 19, 2015 at 10:48 AM, Piero Cinquegrana 
pcinquegr...@marketshare.com wrote:

 Hello,

 I have two RDDs: tv and sessions. I need to convert these DataFrames into
 RDDs because I need to use the groupByKey function. The reduceByKey
 function would not work here as I am not doing any aggregations, but I am
 grouping using a (K, V) pair. See the snippets of code below.

 The bottleneck is definitely not groupByKey as it runs in 78 seconds.

 I find that the leftOuterJoin takes 1.5 hours using 80 executors with 2g
 of memory each. The Spark driver has 5g of memory. I have tried using
 different numPartitions: such as 20, 50 and 200 and I found that 200 works
 best with 80 executors. I also tried to reduce spark.sql.shuffle.partitions
 to 40 at no avail.
 Any suggestions on how to improve the performance of this outer join?

 I have been told to try to broadcast the smaller tv table and do a
 mapper-side join. Do you think this is feasible with such a large table
 (4.4M rows)?

 Much appreciated,

 Piero Cinquegrana
 Marketing Scientist | MarketShare
 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
 P: 310.914.5677 x242 M: 323.377.9197
 www.marketshare.comhttp://www.marketsharepartners.com/
 twitter.com/marketsharephttp://twitter.com/marketsharep


  val
 tv_key=tv.rdd.map(line=((line.getInt(4).toString,line.getString(1)),line)).groupByKey(200)

 tv_key: org.apache.spark.rdd.RDD[((String, String),
 Iterable[org.apache.spark.sql.Row])] = ShuffledRDD[24] at groupByKey at
 console:46

  tv_key.count

 res33: Long = 4406433
 Took 370 seconds

  tv_key.take(5).foreach(println)

 ((537,2014-03-30 11:17:00),CompactBuffer([226,2014-03-30
 11:17:00,2014-03-30 11:17:28,2014-03-30
 11:17:28,537,Bangor,EST5EDT,UNKNOWN,AD15,TV Land,Weekend,M,UNKNOWN|15|TV
 Land|Weekend|M,815,0.005836018617973505]))
 ((516,2014-01-01 17:31:00),CompactBuffer([111,2014-01-01
 17:31:00,2014-01-01 17:31:14,2014-01-01
 17:31:14,516,Erie,EST5EDT,UNKNOWN,AD5,Headline News,Early
 Fringe,M,UNKNOWN|5|Headline News|Early Fringe|M,186,0.0011828298887122416],
 [111,2014-01-01 17:31:00,2014-01-01 17:31:19,2014-01-01
 17:31:19,516,Erie,EST5EDT,UNKNOWN,AD10,Headline News,Early
 Fringe,M,UNKNOWN|10|Headline News|Early
 Fringe|M,186,0.0011828298887122416]))
 ((855,2014-08-28 03:32:00),CompactBuffer([113,2014-08-28
 03:32:00,2014-08-28 03:32:50,2014-08-28
 00:32:50,855,SantaBarbra-SanMar-SanLuOb,PST8PDT,UNKNOWN,AD15,History
 Channel,Late Fringe,M,UNKNOWN|15|History Channel|Late
 Fringe|M,410,0.001749818616362938]))
 ((743,2014-07-17 06:57:00),CompactBuffer([35,2014-07-17
 06:57:00,2014-07-17 06:57:18,2014-07-17
 10:57:18,743,Anchorage,YST9YDT,UNKNOWN,AD30,CMT: Country Music
 Television,Daytime,M,UNKNOWN|30|CMT: Country Music
 Television|Daytime|M,227,0.0014356185175815835]))
 ((574,2014-06-03 18:47:00),CompactBuffer([94,2014-06-03
 18:47:00,2014-06-03 18:47:45,2014-06-03 18:47:45,574,Johnstown-Altoona-St
 Colge,EST5EDT,UNKNOWN,AD30,Game Show Network,Early Fringe,M,UNKNOWN|30|Game
 Show Network|Early Fringe|M,1629,0.005588527908333047]))

  val sessions_key = sessions.rdd.map(line =
 ((line.getString(2),line.getString(1)),line))

 sessions_key: org.apache.spark.rdd.RDD[((String, String),
 org.apache.spark.sql.Row)] = MapPartitionsRDD[30] at map at console:38

  sessions_key.count

 res16: Long = 110383440
 Took 78 seconds

  sessions_key.take(5).foreach(println)

 ((737,2014-01-01 02:01:00),[2014-01-01 01:01:00,2014-01-01
 02:01:00,737,Mankato,0.0,CST6CDT,Wednesday,1,1,1,1,2014,Wednesday|1])
 ((569,2014-01-01 02:01:00),[2014-01-01 02:01:00,2014-01-01
 02:01:00,569,Harrisonburg,0.0,EST5EDT,Wednesday,2,1,1,1,2014,Wednesday|2])
 ((753,2014-01-01 02:01:00),[2014-01-01 00:01:00,2014-01-01
 02:01:00,753,Phoenix
 (Prescott),11.0,MST,Wednesday,0,1,1,1,2014,Wednesday|0])
 ((825,2014-01-01 02:01:00),[2013-12-31 23:01:00,2014-01-01
 02:01:00,825,San Diego,0.0,PST8PDT,Tuesday,23,1,1,12,2013,Tuesday|23])
 ((673,2014-01-01 02:01:00),[2014-01-01 01:01:00,2014-01-01
 02:01:00,673,Columbus-Tupelo-W
 Pnt-Hstn,0.0,CST6CDT,Wednesday,1,1,1,1,2014,Wednesday|1])

  val stack = sessions_key.leftOuterJoin(tv_key, 200)

 stack: org.apache.spark.rdd.RDD[((String, String),
 (org.apache.spark.sql.Row, Option[Iterable[org.apache.spark.sql.Row]]))] =
 MapPartitionsRDD[33] at leftOuterJoin at console:54

  stack.count

 res18: Long = 110383440
 Took 5283 seconds



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >