Bizzare diff in behavior between scala REPL and sparkSQL UDF

2017-06-20 Thread jeff saremi
I have this function which does a regex matching in scala. I test it in the 
REPL I get expected results.

I use it as a UDF in sparkSQL i get completely incorrect results.


Function:

class UrlFilter (filters: Seq[String]) extends Serializable  {
  val regexFilters = filters.map(new Regex(_))
  regexFilters.foreach(println)

  def matches(s: String) : Boolean = {
if(s == null || s.isEmpty) return false
regexFilters.exists(f => {print("matching " + f + " against " + s); s match 
{
case f() => { println("; matched! returning true"); true };
case _ => { println("; did NOT match. returning false"); false }
}})
  }
}

Instantiating it with a pattern like:
^[^:]+://[^.]*\.company[0-9]*9\.com$

(matches a url that has company in the name and a number that ends in digit 9)
Test it in Scala REPL:

scala> val filters = 
Source.fromFile("D:\\cosmos-modules\\testdata\\fakefilters.txt").getLines.toList

scala> val urlFilter = new UrlFilter(filters)

scala>  urlFilter.matches("ftp://ftp.company9.com;)
matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com; 
matched! returning true
res2: Boolean = true


Use it in SparkSQL:

val urlFilter = new UrlFilter(filters)
sqlContext.udf.register("filterListMatch", (url: String) => 
urlFilter.matches(url))

val nonMatchingUrlsDf = sqlContext.sql("SELECT url FROM distinctUrls WHERE NOT 
filterListMatch(url)")

Look at the debug prints in the console:
matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com ; 
did NOT match. returning false

I have repeated this several times to make sure I'm comparing apples only
I am using Spark 1.6 and Scala 2.10.5 with Java 1.8
thanks




RE: [SparkSQL] Escaping a query for a dataframe query

2017-06-16 Thread mark.jenki...@baesystems.com
Thanks both!

FYI the suggestion to escape the quote does not seem to work. I should have 
mentioned I am using spark 1.6.2 and have tried to escape the double quote with 
\\ and .

My gut feel is that escape chars are not considered for UDF parameters for this 
version of spark – I would like to be wrong

[Thread-18] ERROR QueryService$ - Failed to complete query, will mark job as 
failed
java.lang.RuntimeException: [1.118] failure: ``)'' expected but `end' found

SELECT * FROM mytable WHERE mycolumn BETWEEN 1 AND 2 AND 
(myudfsearchfor(index2, "**", "*start\"end*"))


  ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)

From: Gourav Sengupta [mailto:gourav.sengu...@gmail.com]
Sent: 15 June 2017 19:35
To: Michael Mior
Cc: Jenkins, Mark (UK Guildford); user@spark.apache.org
Subject: Re: [SparkSQL] Escaping a query for a dataframe query


ATTENTION. This message originates from outside BAE Systems.
It might be something that I am saying wrong but sometimes it may just make 
sense to see the difference between ” and "


<”> 8221, Hex 201d, Octal 20035

<">  34,  Hex 22,  Octal 042



Regards,


Gourav

On Thu, Jun 15, 2017 at 6:45 PM, Michael Mior 
<mm...@apache.org<mailto:mm...@apache.org>> wrote:
Assuming the parameter to your UDF should be start"end (with a quote in the 
middle) then you need to insert a backslash into the query (which must also be 
escaped in your code). So just add two extra backslashes before the quote 
inside the string.

sqlContext.sql("SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND 
(myudfsearchfor(\"start\\\"end\"))"

--
Michael Mior
mm...@apache.org<mailto:mm...@apache.org>

2017-06-15 12:05 GMT-04:00 
mark.jenki...@baesystems.com<mailto:mark.jenki...@baesystems.com> 
<mark.jenki...@baesystems.com<mailto:mark.jenki...@baesystems.com>>:
Hi,

I have a query  sqlContext.sql(“SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 
AND 2) AND (myudfsearchfor(\“start\"end\”))”



How should I escape the double quote so that it successfully parses?



I know I can use single quotes but I do not want to since I may need to search 
for a single and double quote.



The exception I get is


[Thread-18] ERROR QueryService$ - Failed to complete query, will mark job as 
failed java.lang.RuntimeException: [1.117] failure: ``)'' expected but "end" 
found

SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND 
(myudfsearchfor(\“start\"end\”))

^
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
  at 
org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
  at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
  at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)

Thankyou
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


Re: [SparkSQL] Escaping a query for a dataframe query

2017-06-15 Thread Gourav Sengupta
It might be something that I am saying wrong but sometimes it may just make
sense to see the difference between *” *and "

<”> 8221, Hex 201d, Octal 20035

<">  34,  Hex 22,  Octal 042


Regards,

Gourav

On Thu, Jun 15, 2017 at 6:45 PM, Michael Mior  wrote:

> Assuming the parameter to your UDF should be start"end (with a quote in
> the middle) then you need to insert a backslash into the query (which must
> also be escaped in your code). So just add two extra backslashes before the
> quote inside the string.
>
> sqlContext.sql("SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND
> (myudfsearchfor(\"start\\\"end\"))"
>
> --
> Michael Mior
> mm...@apache.org
>
> 2017-06-15 12:05 GMT-04:00 mark.jenki...@baesystems.com <
> mark.jenki...@baesystems.com>:
>
>> *Hi,*
>>
>>
>>
>> *I have a query  **sqlContext.sql(“**SELECT * FROM mytable WHERE
>> (mycolumn BETWEEN 1 AND 2) AND (myudfsearchfor(\“start\"end\”))”*
>>
>>
>>
>> *How should I escape the double quote so that it successfully parses? *
>>
>>
>>
>> *I know I can use single quotes but I do not want to since I may need to 
>> search for a single and double quote.*
>>
>>
>>
>> *The exception I get is*
>>
>>
>>
>> *[Thread-18] ERROR QueryService$ - Failed to complete query, will mark
>> job as failed java.lang.RuntimeException: [1.117] failure: ``)'' expected
>> but "end" found*
>>
>>
>>
>> *SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND
>> (myudfsearchfor(\“start\"end\”))*
>>
>> *
>> ^*
>>
>> *  at scala.sys.package$.error(package.scala:27)*
>>
>> *  at
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)*
>>
>> *  at
>> org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)*
>>
>> *  at
>> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)*
>>
>> *  at
>> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)*
>>
>>
>>
>> *Thankyou*
>> Please consider the environment before printing this email. This message
>> should be regarded as confidential. If you have received this email in
>> error please notify the sender and destroy it immediately. Statements of
>> intent shall only become binding when confirmed in hard copy by an
>> authorised signatory. The contents of this email may relate to dealings
>> with other companies under the control of BAE Systems Applied Intelligence
>> Limited, details of which can be found at http://www.baesystems.com/Busi
>> nesses/index.htm.
>>
>
>


Re: [SparkSQL] Escaping a query for a dataframe query

2017-06-15 Thread Michael Mior
Assuming the parameter to your UDF should be start"end (with a quote in the
middle) then you need to insert a backslash into the query (which must also
be escaped in your code). So just add two extra backslashes before the
quote inside the string.

sqlContext.sql("SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND
(myudfsearchfor(\"start\\\"end\"))"

--
Michael Mior
mm...@apache.org

2017-06-15 12:05 GMT-04:00 mark.jenki...@baesystems.com <
mark.jenki...@baesystems.com>:

> *Hi,*
>
>
>
> *I have a query  **sqlContext.sql(“**SELECT * FROM mytable WHERE
> (mycolumn BETWEEN 1 AND 2) AND (myudfsearchfor(\“start\"end\”))”*
>
>
>
> *How should I escape the double quote so that it successfully parses? *
>
>
>
> *I know I can use single quotes but I do not want to since I may need to 
> search for a single and double quote.*
>
>
>
> *The exception I get is*
>
>
>
> *[Thread-18] ERROR QueryService$ - Failed to complete query, will mark job
> as failed java.lang.RuntimeException: [1.117] failure: ``)'' expected but
> "end" found*
>
>
>
> *SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND
> (myudfsearchfor(\“start\"end\”))*
>
> *
> ^*
>
> *  at scala.sys.package$.error(package.scala:27)*
>
> *  at
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)*
>
> *  at
> org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)*
>
> *  at
> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)*
>
> *  at
> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)*
>
>
>
> *Thankyou*
> Please consider the environment before printing this email. This message
> should be regarded as confidential. If you have received this email in
> error please notify the sender and destroy it immediately. Statements of
> intent shall only become binding when confirmed in hard copy by an
> authorised signatory. The contents of this email may relate to dealings
> with other companies under the control of BAE Systems Applied Intelligence
> Limited, details of which can be found at http://www.baesystems.com/
> Businesses/index.htm.
>


[SparkSQL] Escaping a query for a dataframe query

2017-06-15 Thread mark.jenki...@baesystems.com
Hi,

I have a query  sqlContext.sql("SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 
AND 2) AND (myudfsearchfor(\"start\"end\"))"



How should I escape the double quote so that it successfully parses?



I know I can use single quotes but I do not want to since I may need to search 
for a single and double quote.



The exception I get is


[Thread-18] ERROR QueryService$ - Failed to complete query, will mark job as 
failed java.lang.RuntimeException: [1.117] failure: ``)'' expected but "end" 
found

SELECT * FROM mytable WHERE (mycolumn BETWEEN 1 AND 2) AND 
(myudfsearchfor(\"start\"end\"))

^
  at scala.sys.package$.error(package.scala:27)
  at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
  at 
org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
  at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
  at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)

Thankyou
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


Re: SparkSQL not able to read a empty table location

2017-05-21 Thread Bajpai, Amit X. -ND
set spark.sql.hive.verifyPartitionPath=true didn’t help. Still getting the same 
error.

I tried to copy a file with a _ prefix and I am not getting the error and the 
file is also ignored by SparkSQL. But when scheduling the job in prod and if 
during one execution there is no data to be processed the query will again 
fail. How to deal with this scenario.


From: Sea <261810...@qq.com>
Date: Sunday, May 21, 2017 at 8:04 AM
To: Steve Loughran <ste...@hortonworks.com>, "Bajpai, Amit X. -ND" 
<amit.x.bajpai@disney.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: SparkSQL not able to read a empty table location


please try spark.sql.hive.verifyPartitionPath true

-- Original --
From:  "Steve Loughran";<ste...@hortonworks.com>;
Date:  Sat, May 20, 2017 09:19 PM
To:  "Bajpai, Amit X. -ND"<amit.x.bajpai@disney.com>;
Cc:  "user@spark.apache.org"<user@spark.apache.org>;
Subject:  Re: SparkSQL not able to read a empty table location


On 20 May 2017, at 01:44, Bajpai, Amit X. -ND 
<amit.x.bajpai@disney.com<mailto:n...@disney.com>> wrote:

Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


There isn't really a "directory" in S3, just a set of objects whose paths begin 
with a string. Try creating an empty file with an _ prefix in the directory; it 
should be ignored by Spark SQL but will cause the "directory" to come into being


Re: SparkSQL not able to read a empty table location

2017-05-21 Thread Sea
please try spark.sql.hive.verifyPartitionPath true


-- Original --
From:  "Steve Loughran";<ste...@hortonworks.com>;
Date:  Sat, May 20, 2017 09:19 PM
To:  "Bajpai, Amit X. -ND"<amit.x.bajpai@disney.com>; 
Cc:  "user@spark.apache.org"<user@spark.apache.org>; 
Subject:  Re: SparkSQL not able to read a empty table location



 
   On 20 May 2017, at 01:44, Bajpai, Amit X. -ND <amit.x.bajpai@disney.com> 
wrote:
 
Hi,
   
  I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying  ??File 
s3n://data/xyz does not exist. null/0??.
   
  select * from tablex limit 10
   
  Can someone let me know how we can fix this issue.
   
  Thanks
 
 
  
 
 
 
 There isn't really a "directory" in S3, just a set of objects whose paths 
begin with a string. Try creating an empty file with an _ prefix in the 
directory; it should be ignored by Spark SQL but will cause the "directory" to 
come into being

Re: SparkSQL not able to read a empty table location

2017-05-20 Thread Steve Loughran

On 20 May 2017, at 01:44, Bajpai, Amit X. -ND 
> wrote:

Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


There isn't really a "directory" in S3, just a set of objects whose paths begin 
with a string. Try creating an empty file with an _ prefix in the directory; it 
should be ignored by Spark SQL but will cause the "directory" to come into being


SparkSQL not able to read a empty table location

2017-05-19 Thread Bajpai, Amit X. -ND
Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Edward Capriolo
Here is a similar but not exact way I did something similar to what you
did. I had two data files in different formats the different columns needed
to be different features. I wanted to feed them into spark's:
https://en.wikibooks.org/wiki/Data_Mining_Algorithms_In_R/Frequent_Pattern_Mining/The_FP-Growth_Algorithm

This only works because I have a few named features, and they become fields
in the model object AntecedentUnion. This would be a crappy solution for a
large sparse matrix.

Also my Scala code is crap too so there is probably a better way to do this!


val b = targ.as[TargetingAntecedent]
val b1 = b.map(c => (c.tdid, c)).rdd.groupByKey()
val bgen = b1.map(f =>
  (f._1 , f._2.map
  ( x => AntecedentUnion("targeting", "", x.targetingdataid,
"", "") )
  ) )

val c = imp.as[ImpressionAntecedent]
val c1 = c.map(k => (k.tdid, k)).rdd.groupByKey()
val cgen = c1.map (f =>
  (f._1 , f._2.map
  ( x => AntecedentUnion("impression", "", "", x.campaignid,
x.adgroupid) ).toSet.toIterable
  ) )

val bgen = TargetingUtil.targetingAntecedent(sparkSession, sqlContext,
targ)
val cgen = TargetingUtil.impressionAntecedent(sparkSession, sqlContext,
imp)
val joined = bgen.join(cgen)

val merged = joined.map(f => (f._1, f._2._1++:(f._2._2) ))
val fullResults : RDD[Array[AntecedentUnion]] = merged.map(f =>
f._2).map(_.toArray[audacity.AntecedentUnion])


So essentially converting everything into AntecedentUnion where the first
column is the type of the tuple, and other fields are supplied or not. Then
merge all those and run fpgrowth on them. Hope that helps!



On Mon, May 15, 2017 at 12:06 PM, goun na  wrote:
>
> I mentioned it opposite. collect_list generates duplicated results.
>
> 2017-05-16 0:50 GMT+09:00 goun na :
>>
>> Hi, Jone Zhang
>>
>> 1. Hive UDF
>> You might need collect_set or collect_list (to eliminate duplication),
but make sure reduce its cardinality before applying UDFs as it can cause
problems while handling 1 billion records. Union dataset 1,2,3 -> group by
user_id1 -> collect_set (feature column) would works.
>>
>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>>
>> 2.Spark Dataframe Pivot
>>
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html
>>
>> - Goun
>>
>> 2017-05-15 22:15 GMT+09:00 Jone Zhang :
>>>
>>> For example
>>> Data1(has 1 billion records)
>>> user_id1  feature1
>>> user_id1  feature2
>>>
>>> Data2(has 1 billion records)
>>> user_id1  feature3
>>>
>>> Data3(has 1 billion records)
>>> user_id1  feature4
>>> user_id1  feature5
>>> ...
>>> user_id1  feature100
>>>
>>> I want to get the result as follow
>>> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>>>
>>> Is there a more efficient way except join?
>>>
>>> Thanks!
>>
>>
>


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread ayan guha
You may consider writing all your data to a nosql datastore such as hbase,
using user id as key.

There is a sql solution using max and inner case and finally union the
results, but that may be expensive
On Tue, 16 May 2017 at 12:13 am, Didac Gil  wrote:

> Or maybe you could also check using the collect_list from the SQL functions
>
> val compacter = Data1.groupBy(“UserID")
>   
> .agg(org.apache.spark.sql.functions.collect_list(“feature").as(“ListOfFeatures"))
>
>
>
> On 15 May 2017, at 15:15, Jone Zhang  wrote:
>
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
>
> Data2(has 1 billion records)
> user_id1  feature3
>
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
>
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>
> Is there a more efficient way except join?
>
> Thanks!
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544
> Sweden: +46 (0)730229737
> Skype: didac.gil.de.la.iglesia
>
> --
Best Regards,
Ayan Guha


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Didac Gil
Or maybe you could also check using the collect_list from the SQL functions
val compacter = Data1.groupBy(“UserID")
  
.agg(org.apache.spark.sql.functions.collect_list(“feature").as(“ListOfFeatures"))


> On 15 May 2017, at 15:15, Jone Zhang  wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Didac Gil
I guess that if your user_id field is the key, you could use the 
updateStateByKey function.

I did not test it, but it could be something along these lines:

def yourCombineFunction(input: Seq[(String)],accumulatedInput: Option[(String)] 
= {
val state = accumulatedInput.getOrElse((“”)) //In case the current Key 
was not found before, the features list is empty
val feature = input._1 //We get the feature value of this new entry

val newFeature = state._1 +” “+feature
Some((newFeature)) //The new accumulated value for the features is 
returned
}

val updatedData = Data1.updateStateByKey(yourCombineFunction) //This would 
“iterate” among all the entries in your Dataset and, for each row, will update 
the “accumulatedFeatures”

Good luck

> On 15 May 2017, at 15:15, Jone Zhang  wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Jone Zhang
For example
Data1(has 1 billion records)
user_id1  feature1
user_id1  feature2

Data2(has 1 billion records)
user_id1  feature3

Data3(has 1 billion records)
user_id1  feature4
user_id1  feature5
...
user_id1  feature100

I want to get the result as follow
user_id1  feature1 feature2 feature3 feature4 feature5...feature100

Is there a more efficient way except join?

Thanks!


Feasability limits of joins in SparkSQL (Why does my driver explode with a large number of joins?)

2017-04-11 Thread Rick Moritz
Hi List,

I'm currently trying to naively implement a Data-Vault-type Data-Warehouse
using SparkSQL, and was wondering whether there's an inherent practical
limit to query complexity, beyond which SparkSQL will stop functioning,
even for relatively small amounts of data.

I'm currently looking at a query, which has 19 joins (including some
cartesian joins) in the main query, and another instance of the same 19
joins in a subquery.
What I'm seeing is, that even with very restrictive filtering, which gets
pushed down the pipeline, I run out of driver memory (36G) after just a few
minutes, into a ~4900-task stage.
In fact, quite often just using SparkUI pushes me into the GC-Overhead
limit, with the job then failing.

Obviously, this way of organizing the data isn't ideal, and we're looking
into moving most of the joins into a relational DB. Nonetheless, the way
the driver explodes with no apparent reason is pretty worrying. The
behaviour is also quite independent of how much memory I give the driver.
I'm currently looking into getting a memory dump of the driver, to figure
out which object is hogging memory in the driver. Given that I don't
consciously collect() any major amount of data, I'm surprised about this
behavior, I even suspect that the large graph might be causing issues in
just the SparkUI - I'll have to retry with it disabled.

If you have any experience with significant amount of joins in single
queries, then I'd love to hear from you - maybe someone has also
experienced exploding driver syndrome with non-obvious causes in this
context.

Thanks for any input,

Rick


[SparkSQL] Project using NamedExpression

2017-03-21 Thread Aviral Agarwal
Hi guys,

I want transform Row using NamedExpression.

Below is the code snipped that I am using :


def apply(dataFrame: DataFrame, selectExpressions:
java.util.List[String]): RDD[UnsafeRow] = {

val exprArray = selectExpressions.map(s =>
  Column(SqlParser.parseExpression(s)).named
)

val inputSchema = dataFrame.logicalPlan.output

val transformedRDD = dataFrame.mapPartitions(
  iter => {
val project = UnsafeProjection.create(exprArray,inputSchema)
iter.map{
  row =>
project(InternalRow.fromSeq(row.toSeq))
}
})

transformedRDD
  }


The problem is that expression becomes unevaluable :

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
expression: 'a
at
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233)
at
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.genCode(unresolved.scala:53)
at
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
at
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:464)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext.generateExpressions(CodeGenerator.scala:464)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:281)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:324)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:317)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635)
at
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:125)
at
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:135)
at
org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(ScalaTransform.scala:31)
at
org.apache.spark.sql.ScalaTransform$$anonfun$3.apply(ScalaTransform.scala:30)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


This might be because the Expression is unresolved.

Any help would be appreciated.

Thanks and Regards,
Aviral Agarwal


Re: [SparkSQL] too many open files although ulimit set to 1048576

2017-03-13 Thread darin
I think your sets not works
try add `ulimit -n 10240 ` in spark-env.sh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-too-many-open-files-although-ulimit-set-to-1048576-tp28490p28491.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



答复: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Linyuxin
Hi Gurdit Singh
Thanks. It is very helpful.

发件人: Gurdit Singh [mailto:gurdit.si...@bitwiseglobal.com]
发送时间: 2017年2月22日 13:31
收件人: Linyuxin <linyu...@huawei.com>; Irving Duran <irving.du...@gmail.com>; 
Yong Zhang <java8...@hotmail.com>
抄送: Jacek Laskowski <ja...@japila.pl>; user <user@spark.apache.org>
主题: RE: [SparkSQL] pre-check syntex before running spark job?

Hi, you can use spark sql Antlr grammer for pre check you syntax.

https://github.com/apache/spark/blob/acf71c63cdde8dced8d108260cdd35e1cc992248/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4


From: Linyuxin [mailto:linyu...@huawei.com]
Sent: Wednesday, February 22, 2017 7:34 AM
To: Irving Duran <irving.du...@gmail.com<mailto:irving.du...@gmail.com>>; Yong 
Zhang <java8...@hotmail.com<mailto:java8...@hotmail.com>>
Cc: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: 答复: [SparkSQL] pre-check syntex before running spark job?

Actually,I want a standalone jar as I can check the syntax without spark 
execution environment

发件人: Irving Duran [mailto:irving.du...@gmail.com]
发送时间: 2017年2月21日 23:29
收件人: Yong Zhang <java8...@hotmail.com<mailto:java8...@hotmail.com>>
抄送: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>; Linyuxin 
<linyu...@huawei.com<mailto:linyu...@huawei.com>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: [SparkSQL] pre-check syntex before running spark job?

You can also run it on REPL and test to see if you are getting the expected 
result.


Thank You,

Irving Duran

On Tue, Feb 21, 2017 at 8:01 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

You can always use explain method to validate your DF or SQL, before any action.



Yong


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Sent: Tuesday, February 21, 2017 4:34 AM
To: Linyuxin
Cc: user
Subject: Re: [SparkSQL] pre-check syntex before running spark job?

Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs (just 
as Spark SQL does while parsing queries). I think it's a one-hour project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin" 
<linyu...@huawei.com<mailto:linyu...@huawei.com>> wrote:
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




RE: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Gurdit Singh
Hi, you can use spark sql Antlr grammer for pre check you syntax.

https://github.com/apache/spark/blob/acf71c63cdde8dced8d108260cdd35e1cc992248/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4


From: Linyuxin [mailto:linyu...@huawei.com]
Sent: Wednesday, February 22, 2017 7:34 AM
To: Irving Duran <irving.du...@gmail.com>; Yong Zhang <java8...@hotmail.com>
Cc: Jacek Laskowski <ja...@japila.pl>; user <user@spark.apache.org>
Subject: 答复: [SparkSQL] pre-check syntex before running spark job?

Actually,I want a standalone jar as I can check the syntax without spark 
execution environment

发件人: Irving Duran [mailto:irving.du...@gmail.com]
发送时间: 2017年2月21日 23:29
收件人: Yong Zhang <java8...@hotmail.com<mailto:java8...@hotmail.com>>
抄送: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>; Linyuxin 
<linyu...@huawei.com<mailto:linyu...@huawei.com>>; user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: [SparkSQL] pre-check syntex before running spark job?

You can also run it on REPL and test to see if you are getting the expected 
result.


Thank You,

Irving Duran

On Tue, Feb 21, 2017 at 8:01 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

You can always use explain method to validate your DF or SQL, before any action.



Yong


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Sent: Tuesday, February 21, 2017 4:34 AM
To: Linyuxin
Cc: user
Subject: Re: [SparkSQL] pre-check syntex before running spark job?

Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs (just 
as Spark SQL does while parsing queries). I think it's a one-hour project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin" 
<linyu...@huawei.com<mailto:linyu...@huawei.com>> wrote:
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




答复: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Linyuxin
Actually,I want a standalone jar as I can check the syntax without spark 
execution environment

发件人: Irving Duran [mailto:irving.du...@gmail.com]
发送时间: 2017年2月21日 23:29
收件人: Yong Zhang <java8...@hotmail.com>
抄送: Jacek Laskowski <ja...@japila.pl>; Linyuxin <linyu...@huawei.com>; user 
<user@spark.apache.org>
主题: Re: [SparkSQL] pre-check syntex before running spark job?

You can also run it on REPL and test to see if you are getting the expected 
result.


Thank You,

Irving Duran

On Tue, Feb 21, 2017 at 8:01 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

You can always use explain method to validate your DF or SQL, before any action.



Yong


From: Jacek Laskowski <ja...@japila.pl<mailto:ja...@japila.pl>>
Sent: Tuesday, February 21, 2017 4:34 AM
To: Linyuxin
Cc: user
Subject: Re: [SparkSQL] pre-check syntex before running spark job?

Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs (just 
as Spark SQL does while parsing queries). I think it's a one-hour project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin" 
<linyu...@huawei.com<mailto:linyu...@huawei.com>> wrote:
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




Re: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Irving Duran
You can also run it on REPL and test to see if you are getting the expected
result.


Thank You,

Irving Duran

On Tue, Feb 21, 2017 at 8:01 AM, Yong Zhang <java8...@hotmail.com> wrote:

> You can always use explain method to validate your DF or SQL, before any
> action.
>
>
> Yong
>
>
> --
> *From:* Jacek Laskowski <ja...@japila.pl>
> *Sent:* Tuesday, February 21, 2017 4:34 AM
> *To:* Linyuxin
> *Cc:* user
> *Subject:* Re: [SparkSQL] pre-check syntex before running spark job?
>
> Hi,
>
> Never heard about such a tool before. You could use Antlr to parse SQLs
> (just as Spark SQL does while parsing queries). I think it's a one-hour
> project.
>
> Jacek
>
> On 21 Feb 2017 4:44 a.m., "Linyuxin" <linyu...@huawei.com> wrote:
>
> Hi All,
> Is there any tool/api to check the sql syntax without running spark job
> actually?
>
> Like the siddhiQL on storm here:
> SiddhiManagerService. validateExecutionPlan
> https://github.com/wso2/siddhi/blob/master/modules/siddhi-
> core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
> it can validate the syntax before running the sql on storm
>
> this is very useful for exposing sql string as a DSL of the platform.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Yong Zhang
You can always use explain method to validate your DF or SQL, before any action.


Yong



From: Jacek Laskowski <ja...@japila.pl>
Sent: Tuesday, February 21, 2017 4:34 AM
To: Linyuxin
Cc: user
Subject: Re: [SparkSQL] pre-check syntex before running spark job?

Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs (just 
as Spark SQL does while parsing queries). I think it's a one-hour project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin" 
<linyu...@huawei.com<mailto:linyu...@huawei.com>> wrote:
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




Re: [SparkSQL] pre-check syntex before running spark job?

2017-02-21 Thread Jacek Laskowski
Hi,

Never heard about such a tool before. You could use Antlr to parse SQLs
(just as Spark SQL does while parsing queries). I think it's a one-hour
project.

Jacek

On 21 Feb 2017 4:44 a.m., "Linyuxin"  wrote:

Hi All,
Is there any tool/api to check the sql syntax without running spark job
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/
siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


[SparkSQL] pre-check syntex before running spark job?

2017-02-20 Thread Linyuxin
Hi All,
Is there any tool/api to check the sql syntax without running spark job 
actually?

Like the siddhiQL on storm here:
SiddhiManagerService. validateExecutionPlan
https://github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/SiddhiManagerService.java
it can validate the syntax before running the sql on storm 

this is very useful for exposing sql string as a DSL of the platform.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is RAND() in SparkSQL deterministic when used on MySql data sources?

2017-01-12 Thread Gabriele Del Prete
Hi all,

We need to use the rand() function in Scala Spark SQL in our
application, but we discovered that it behavior was not deterministic, that
is, different invocations with the same  would result in different
values. This is documented in some bugs, for example:
https://issues.apache.org/jira/browse/SPARK-1 and it has to do with
partitioning.

So we refactored it by moving the rand() function from a query using Parquet
files on S3 as a datasource, to another query that we run on MySQL (still
using the Spark SLQ Scala API), assuming that MySQL quesries do not get
parallelized. Can we indeed safely assume that now rand() will be
deterministic, or does the source of non-deterministic behavior lie in the
Spark SQL engine rather than the specific datasource ?

Gabriele



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-RAND-in-SparkSQL-deterministic-when-used-on-MySql-data-sources-tp28302.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Nested ifs in sparksql

2017-01-11 Thread Raghavendra Pandey
I am not using case when. It is mostly IF. By slow, I mean 6 min even for
10 records for 41 level nested ifs.

On Jan 11, 2017 3:31 PM, "Georg Heiler"  wrote:

> I was using the dataframe api not sql. The main problem was that too much
> code was generated.
> Using an unforgettable turned out to be quicker as well.
> Olivier Girardot  schrieb am Di. 10.
> Jan. 2017 um 21:54:
>
>> Are you using the "case when" functions ? what do you mean by slow ? can
>> you share a snippet ?
>>
>>
>>
>> On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
>> wrote:
>>
>> Maybe you can create an UDF?
>>
>> Raghavendra Pandey  schrieb am Di., 10.
>> Jan. 2017 um 20:04 Uhr:
>>
>> I have of around 41 level of nested if else in spark sql. I have
>> programmed it using apis on dataframe. But it takes too much time.
>> Is there anything I can do to improve on time here?
>>
>>
>>
>> *Olivier Girardot* | Associé
>> o.girar...@lateral-thoughts.com
>> +33 6 24 09 17 94
>>
>


Re: Nested ifs in sparksql

2017-01-11 Thread Georg Heiler
I was using the dataframe api not sql. The main problem was that too much
code was generated.
Using an unforgettable turned out to be quicker as well.
Olivier Girardot  schrieb am Di. 10. Jan.
2017 um 21:54:

> Are you using the "case when" functions ? what do you mean by slow ? can
> you share a snippet ?
>
>
>
> On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
> wrote:
>
> Maybe you can create an UDF?
>
> Raghavendra Pandey  schrieb am Di., 10.
> Jan. 2017 um 20:04 Uhr:
>
> I have of around 41 level of nested if else in spark sql. I have
> programmed it using apis on dataframe. But it takes too much time.
> Is there anything I can do to improve on time here?
>
>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: Nested ifs in sparksql

2017-01-10 Thread Olivier Girardot
Are you using the "case when" functions ? what do you mean by slow ? can you
share a snippet ?
 





On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
wrote:
Maybe you can create an UDF?
Raghavendra Pandey  schrieb am Di., 10. Jan. 2017
um 20:04 Uhr:
I have of around 41 level of nested if else in spark sql. I have programmed it
using apis on dataframe. But it takes too much time. 
Is there anything I can do to improve on time here? 



Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Nested ifs in sparksql

2017-01-10 Thread Georg Heiler
Maybe you can create an UDF?

Raghavendra Pandey  schrieb am Di., 10. Jan.
2017 um 20:04 Uhr:

> I have of around 41 level of nested if else in spark sql. I have
> programmed it using apis on dataframe. But it takes too much time.
> Is there anything I can do to improve on time here?
>


Nested ifs in sparksql

2017-01-10 Thread Raghavendra Pandey
I have of around 41 level of nested if else in spark sql. I have programmed
it using apis on dataframe. But it takes too much time.
Is there anything I can do to improve on time here?


Re: Query in SparkSQL

2016-12-12 Thread vaquar khan
Hi Neeraj,

As per my understanding Spark SQL doesn't support Update statements .
Why you need update command in Spark SQL, You can run command in Hive .

Regards,
Vaquar khan

On Mon, Dec 12, 2016 at 10:21 PM, Niraj Kumar <nku...@incedoinc.com> wrote:

> Hi
>
>
>
> I am working on SpqrkSQL using hiveContext (version 1.6.2).
>
> Can I run following queries directly in sparkSQL, if yes how
>
>
>
> update calls set sample = 'Y' where accnt_call_id in (select accnt_call_id
> from samples);
>
>
>
> insert into details (accnt_call_id, prdct_cd, prdct_id, dtl_pstn) select
> accnt_call_id, prdct_cd, prdct_id, 32 from samples where PRDCT_CD = 2114515;
>
>
>
>
>
>
>
> Thanks and Regards,
>
> *Niraj Kumar*
>
>
>
> Disclaimer :
> This email communication may contain privileged and confidential
> information and is intended for the use of the addressee only. If you are
> not an intended recipient you are requested not to reproduce, copy
> disseminate or in any manner distribute this email communication as the
> same is strictly prohibited. If you have received this email in error,
> please notify the sender immediately by return e-mail and delete the
> communication sent in error. Email communications cannot be guaranteed to
> be secure & error free and Incedo Inc. is not liable for any errors in
> the email communication or for the proper, timely and complete transmission
> thereof.
>



-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Query in SparkSQL

2016-12-12 Thread Niraj Kumar
Hi

I am working on SpqrkSQL using hiveContext (version 1.6.2).
Can I run following queries directly in sparkSQL, if yes how

update calls set sample = 'Y' where accnt_call_id in (select accnt_call_id from 
samples);

insert into details (accnt_call_id, prdct_cd, prdct_id, dtl_pstn) select 
accnt_call_id, prdct_cd, prdct_id, 32 from samples where PRDCT_CD = 2114515;



Thanks and Regards,
Niraj Kumar

Disclaimer : 
This email communication may contain privileged and confidential information 
and is intended for the use of the addressee only. If you are not an intended 
recipient you are requested not to reproduce, copy disseminate or in any manner 
distribute this email communication as the same is strictly prohibited. If you 
have received this email in error, please notify the sender immediately by 
return e-mail and delete the communication sent in error. Email communications 
cannot be guaranteed to be secure & error free and Incedo Inc. is not liable 
for any errors in the email communication or for the proper, timely and 
complete transmission thereof.


SparkSQL

2016-12-09 Thread Niraj Kumar
Hi

I am working on SpqrkSQL using hiveContext (version 1.6.2).
Can someone help me to convert following queries in sparkSQL.

update calls set sample = 'Y' where accnt_call_id in (select accnt_call_id from 
samples);
insert into details (accnt_call_id, prdct_cd, prdct_id, dtl_pstn) select 
accnt_call_id, prdct_cd, prdct_id, 32 from samples where PRDCT_CD = 2114515;
delete from samples where PRDCT_CD in (2106861, 2114515);

Thanks and Regards,
Niraj Kumar

Disclaimer : 
This email communication may contain privileged and confidential information 
and is intended for the use of the addressee only. If you are not an intended 
recipient you are requested not to reproduce, copy disseminate or in any manner 
distribute this email communication as the same is strictly prohibited. If you 
have received this email in error, please notify the sender immediately by 
return e-mail and delete the communication sent in error. Email communications 
cannot be guaranteed to be secure & error free and Incedo Inc. is not liable 
for any errors in the email communication or for the proper, timely and 
complete transmission thereof.


ClassCastException when using SparkSQL Window function

2016-11-17 Thread Isabelle Phan
Hello,

I have a simple session table, which tracks pages users visited with a
sessionId. I would like to apply a window function by sessionId, but am
hitting a type cast exception. I am using Spark 1.5.0.

Here is sample code:
scala> df.printSchema
root
 |-- sessionid: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- page: string (nullable = true)

scala> df.withColumn("num",
rowNumber.over(Window.partitionBy("sessionid"))).show(10)

Here is the error stacktrace:
Caused by: java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Has anyone encountered this problem before? Any pointers would be greatly
appreciated.


Thanks!

Isabelle


SparkSQL: intra-SparkSQL-application table registration

2016-11-14 Thread Mohamed Nadjib Mami

Hello,

I've asked the following question [1] on Stackoverflow but didn't get an 
answer, yet. I use now this channel to give it more visibility, and 
hopefully find someone who can help.


"*Context.* I have tens of SQL queries stored in separate files. For 
benchmarking purposes, I created an application that iterates through 
each of those query files and passes it to a standalone Spark 
application. This latter /first/ parses the query, extracts the used 
tables, registers them (using: registerTempTable() in Spark < 2 and 
createOrReplaceTempView() in Spark 2), and executes effectively the 
query (spark.sql()).


*Challenge.* Since registering the tables can sometimes be time 
consuming, I would like to register the tables only once when they are 
first used, and keep that in form of metadata that can readily be used 
in the subsequent queries without the need to re-register the tables 
again. It's a sort of intra-job caching but not any of the caching Spark 
offers (table caching), as far as I know.


Is that possible? if not can anyone suggest another approach to 
accomplish the same goal (i.e., iterating through separate query files 
and run a querying Spark application without registering the tables that 
have already been registered before)."


[1]: 
http://stackoverflow.com/questions/40549924/sparksql-intra-sparksql-application-table-registration


Cheers,
Mohamed



Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
okay i see the partition local sort. got it.

i would expect that pushing the partition local sort into shuffle would
give a signficicant boost. but thats just a guess.

On Fri, Nov 4, 2016 at 2:39 PM, Michael Armbrust 
wrote:

> sure, but then my values are not sorted per key, right?
>
>
> It does do a partition local sort. Look at the query plan in my example
> .
> The code here will also take care of finding the boundaries and is pretty
> careful to spill / avoid materializing unnecessarily.
>
> I think you are correct though that we are not pushing any of the sort
> into the shuffle.  I'm not sure how much that buys you.  If its a lot we
> could extend the planner to look for Exchange->Sort pairs and change the
> exchange.
>
> On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers  wrote:
>
>> i just noticed Sort for Dataset has a global flag. and Dataset also has
>> sortWithinPartitions.
>>
>> how about:
>> repartition + sortWithinPartitions + mapPartitions?
>>
>> the plan looks ok, but it is not clear to me if the sort is done as part
>> of the shuffle (which is the important optimization).
>>
>> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
>> "value")
>>
>> scala> df.repartition(2, 
>> col("key")).sortWithinPartitions("value").as[(String,
>> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
>> == Physical Plan ==
>> *SerializeFromObject [staticinvoke(class 
>> org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
>> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
>> object)._2, true) AS _2#40]
>> +- MapPartitions , obj#38: scala.Tuple2
>>+- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
>> scala.Tuple2
>>   +- *Sort [value#6 ASC], false, 0
>>  +- Exchange hashpartitioning(key#5, 2)
>> +- LocalTableScan [key#5, value#6]
>>
>>
>>
>>
>> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:
>>
>>> sure, but then my values are not sorted per key, right?
>>>
>>> so a group by key with values sorted according to to some ordering is an
>>> operation that can be done efficiently in a single shuffle without first
>>> figuring out range boundaries. and it is needed for quite a few algos,
>>> including Window and lots of timeseries stuff. but it seems there is no way
>>> to express i want to do this yet (at least not in an efficient way).
>>>
>>> which makes me wonder, what does Window do?
>>>
>>>
>>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Thinking out loud is good :)

 You are right in that anytime you ask for a global ordering from Spark
 you will pay the cost of figuring out the range boundaries for partitions.
 If you say orderBy, though, we aren't sure that you aren't expecting a
 global order.

 If you only want to make sure that items are colocated, it is cheaper
 to do a groupByKey followed by a flatMapGroups
 
 .



 On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers 
 wrote:

> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
> then do mapPartitions?
>
> sorry thinking out loud a bit here. ok i think that could work. thanks
>
> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
> wrote:
>
>> thats an interesting thought about orderBy and mapPartitions. i guess
>> i could emulate a groupBy with secondary sort using those two. however
>> isn't using an orderBy expensive since it is a total sort? i mean a 
>> groupBy
>> with secondary sort is also a total sort under the hood, but its on
>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>> therefore can be implemented more efficiently.
>>
>>
>>
>>
>>
>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> It is still unclear to me why we should remember all these tricks
 (or add lots of extra little functions) when this elegantly can be
 expressed in a reduce operation with a simple one line lamba function.

>>> I think you can do that too.  KeyValueGroupedDataset has a
>>> reduceGroups function.  This probably won't be as fast though because 
>>> you
>>> end up creating objects where as the version I gave will get 

Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Michael Armbrust
>
> sure, but then my values are not sorted per key, right?


It does do a partition local sort. Look at the query plan in my example
.
The code here will also take care of finding the boundaries and is pretty
careful to spill / avoid materializing unnecessarily.

I think you are correct though that we are not pushing any of the sort into
the shuffle.  I'm not sure how much that buys you.  If its a lot we could
extend the planner to look for Exchange->Sort pairs and change the exchange.

On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers  wrote:

> i just noticed Sort for Dataset has a global flag. and Dataset also has
> sortWithinPartitions.
>
> how about:
> repartition + sortWithinPartitions + mapPartitions?
>
> the plan looks ok, but it is not clear to me if the sort is done as part
> of the shuffle (which is the important optimization).
>
> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
> "value")
>
> scala> df.repartition(2, col("key")).sortWithinPartitions("value").as[(String,
> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
> object)._2, true) AS _2#40]
> +- MapPartitions , obj#38: scala.Tuple2
>+- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
> scala.Tuple2
>   +- *Sort [value#6 ASC], false, 0
>  +- Exchange hashpartitioning(key#5, 2)
> +- LocalTableScan [key#5, value#6]
>
>
>
>
> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:
>
>> sure, but then my values are not sorted per key, right?
>>
>> so a group by key with values sorted according to to some ordering is an
>> operation that can be done efficiently in a single shuffle without first
>> figuring out range boundaries. and it is needed for quite a few algos,
>> including Window and lots of timeseries stuff. but it seems there is no way
>> to express i want to do this yet (at least not in an efficient way).
>>
>> which makes me wonder, what does Window do?
>>
>>
>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust > > wrote:
>>
>>> Thinking out loud is good :)
>>>
>>> You are right in that anytime you ask for a global ordering from Spark
>>> you will pay the cost of figuring out the range boundaries for partitions.
>>> If you say orderBy, though, we aren't sure that you aren't expecting a
>>> global order.
>>>
>>> If you only want to make sure that items are colocated, it is cheaper to
>>> do a groupByKey followed by a flatMapGroups
>>> 
>>> .
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>>>
 i guess i could sort by (hashcode(key), key, secondarySortColumn) and
 then do mapPartitions?

 sorry thinking out loud a bit here. ok i think that could work. thanks

 On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
 wrote:

> thats an interesting thought about orderBy and mapPartitions. i guess
> i could emulate a groupBy with secondary sort using those two. however
> isn't using an orderBy expensive since it is a total sort? i mean a 
> groupBy
> with secondary sort is also a total sort under the hood, but its on
> (hashCode(key), secondarySortColumn) which is easier to distribute and
> therefore can be implemented more efficiently.
>
>
>
>
>
> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> It is still unclear to me why we should remember all these tricks (or
>>> add lots of extra little functions) when this elegantly can be 
>>> expressed in
>>> a reduce operation with a simple one line lamba function.
>>>
>> I think you can do that too.  KeyValueGroupedDataset has a
>> reduceGroups function.  This probably won't be as fast though because you
>> end up creating objects where as the version I gave will get codgened to
>> operate on binary data the whole way though.
>>
>>> The same applies to these Window functions. I had to read it 3 times
>>> to understand what it all means. Maybe it makes sense for someone who 
>>> has
>>> been forced to use such limited tools in sql for many years but that's 
>>> not
>>> necessary what we should aim for. Why 

Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
i just noticed Sort for Dataset has a global flag. and Dataset also has
sortWithinPartitions.

how about:
repartition + sortWithinPartitions + mapPartitions?

the plan looks ok, but it is not clear to me if the sort is done as part of
the shuffle (which is the important optimization).

scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
"value")

scala> df.repartition(2,
col("key")).sortWithinPartitions("value").as[(String,
String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._1, true) AS _1#39, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._2, true) AS _2#40]
+- MapPartitions , obj#38: scala.Tuple2
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
scala.Tuple2
  +- *Sort [value#6 ASC], false, 0
 +- Exchange hashpartitioning(key#5, 2)
+- LocalTableScan [key#5, value#6]




On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:

> sure, but then my values are not sorted per key, right?
>
> so a group by key with values sorted according to to some ordering is an
> operation that can be done efficiently in a single shuffle without first
> figuring out range boundaries. and it is needed for quite a few algos,
> including Window and lots of timeseries stuff. but it seems there is no way
> to express i want to do this yet (at least not in an efficient way).
>
> which makes me wonder, what does Window do?
>
>
> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust 
> wrote:
>
>> Thinking out loud is good :)
>>
>> You are right in that anytime you ask for a global ordering from Spark
>> you will pay the cost of figuring out the range boundaries for partitions.
>> If you say orderBy, though, we aren't sure that you aren't expecting a
>> global order.
>>
>> If you only want to make sure that items are colocated, it is cheaper to
>> do a groupByKey followed by a flatMapGroups
>> 
>> .
>>
>>
>>
>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>>
>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>> then do mapPartitions?
>>>
>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>
>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
>>> wrote:
>>>
 thats an interesting thought about orderBy and mapPartitions. i guess i
 could emulate a groupBy with secondary sort using those two. however isn't
 using an orderBy expensive since it is a total sort? i mean a groupBy with
 secondary sort is also a total sort under the hood, but its on
 (hashCode(key), secondarySortColumn) which is easier to distribute and
 therefore can be implemented more efficiently.





 On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> It is still unclear to me why we should remember all these tricks (or
>> add lots of extra little functions) when this elegantly can be expressed 
>> in
>> a reduce operation with a simple one line lamba function.
>>
> I think you can do that too.  KeyValueGroupedDataset has a
> reduceGroups function.  This probably won't be as fast though because you
> end up creating objects where as the version I gave will get codgened to
> operate on binary data the whole way though.
>
>> The same applies to these Window functions. I had to read it 3 times
>> to understand what it all means. Maybe it makes sense for someone who has
>> been forced to use such limited tools in sql for many years but that's 
>> not
>> necessary what we should aim for. Why can I not just have the sortBy and
>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>
> We also have orderBy and mapPartitions.
>
>> All these functions (rank etc.) can be trivially expressed in this,
>> plus I can add other operations if needed, instead of being locked in 
>> like
>> this Window framework.
>>
>  I agree that window functions would probably not be my first choice
> for many problems, but for people coming from SQL it was a very popular
> feature.  My real goal is to give as many paradigms as possible in a 
> single
> unified framework.  Let people pick the right mode of expression for any
> given job :)
>


>>>
>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
sure, but then my values are not sorted per key, right?

so a group by key with values sorted according to to some ordering is an
operation that can be done efficiently in a single shuffle without first
figuring out range boundaries. and it is needed for quite a few algos,
including Window and lots of timeseries stuff. but it seems there is no way
to express i want to do this yet (at least not in an efficient way).

which makes me wonder, what does Window do?


On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust 
wrote:

> Thinking out loud is good :)
>
> You are right in that anytime you ask for a global ordering from Spark you
> will pay the cost of figuring out the range boundaries for partitions.  If
> you say orderBy, though, we aren't sure that you aren't expecting a
> global order.
>
> If you only want to make sure that items are colocated, it is cheaper to
> do a groupByKey followed by a flatMapGroups
> 
> .
>
>
>
> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>
>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>> then do mapPartitions?
>>
>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>
>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers  wrote:
>>
>>> thats an interesting thought about orderBy and mapPartitions. i guess i
>>> could emulate a groupBy with secondary sort using those two. however isn't
>>> using an orderBy expensive since it is a total sort? i mean a groupBy with
>>> secondary sort is also a total sort under the hood, but its on
>>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>>> therefore can be implemented more efficiently.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust >> > wrote:
>>>
 It is still unclear to me why we should remember all these tricks (or
> add lots of extra little functions) when this elegantly can be expressed 
> in
> a reduce operation with a simple one line lamba function.
>
 I think you can do that too.  KeyValueGroupedDataset has a
 reduceGroups function.  This probably won't be as fast though because you
 end up creating objects where as the version I gave will get codgened to
 operate on binary data the whole way though.

> The same applies to these Window functions. I had to read it 3 times
> to understand what it all means. Maybe it makes sense for someone who has
> been forced to use such limited tools in sql for many years but that's not
> necessary what we should aim for. Why can I not just have the sortBy and
> then an Iterator[X] => Iterator[Y] to express what I want to do?
>
 We also have orderBy and mapPartitions.

> All these functions (rank etc.) can be trivially expressed in this,
> plus I can add other operations if needed, instead of being locked in like
> this Window framework.
>
  I agree that window functions would probably not be my first choice
 for many problems, but for people coming from SQL it was a very popular
 feature.  My real goal is to give as many paradigms as possible in a single
 unified framework.  Let people pick the right mode of expression for any
 given job :)

>>>
>>>
>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
Thinking out loud is good :)

You are right in that anytime you ask for a global ordering from Spark you
will pay the cost of figuring out the range boundaries for partitions.  If
you say orderBy, though, we aren't sure that you aren't expecting a global
order.

If you only want to make sure that items are colocated, it is cheaper to do
a groupByKey followed by a flatMapGroups

.



On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:

> i guess i could sort by (hashcode(key), key, secondarySortColumn) and then
> do mapPartitions?
>
> sorry thinking out loud a bit here. ok i think that could work. thanks
>
> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers  wrote:
>
>> thats an interesting thought about orderBy and mapPartitions. i guess i
>> could emulate a groupBy with secondary sort using those two. however isn't
>> using an orderBy expensive since it is a total sort? i mean a groupBy with
>> secondary sort is also a total sort under the hood, but its on
>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>> therefore can be implemented more efficiently.
>>
>>
>>
>>
>>
>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust 
>> wrote:
>>
>>> It is still unclear to me why we should remember all these tricks (or
 add lots of extra little functions) when this elegantly can be expressed in
 a reduce operation with a simple one line lamba function.

>>> I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
>>> function.  This probably won't be as fast though because you end up
>>> creating objects where as the version I gave will get codgened to operate
>>> on binary data the whole way though.
>>>
 The same applies to these Window functions. I had to read it 3 times to
 understand what it all means. Maybe it makes sense for someone who has been
 forced to use such limited tools in sql for many years but that's not
 necessary what we should aim for. Why can I not just have the sortBy and
 then an Iterator[X] => Iterator[Y] to express what I want to do?

>>> We also have orderBy and mapPartitions.
>>>
 All these functions (rank etc.) can be trivially expressed in this,
 plus I can add other operations if needed, instead of being locked in like
 this Window framework.

>>>  I agree that window functions would probably not be my first choice for
>>> many problems, but for people coming from SQL it was a very popular
>>> feature.  My real goal is to give as many paradigms as possible in a single
>>> unified framework.  Let people pick the right mode of expression for any
>>> given job :)
>>>
>>
>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
i guess i could sort by (hashcode(key), key, secondarySortColumn) and then
do mapPartitions?

sorry thinking out loud a bit here. ok i think that could work. thanks

On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers  wrote:

> thats an interesting thought about orderBy and mapPartitions. i guess i
> could emulate a groupBy with secondary sort using those two. however isn't
> using an orderBy expensive since it is a total sort? i mean a groupBy with
> secondary sort is also a total sort under the hood, but its on
> (hashCode(key), secondarySortColumn) which is easier to distribute and
> therefore can be implemented more efficiently.
>
>
>
>
>
> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust 
> wrote:
>
>> It is still unclear to me why we should remember all these tricks (or add
>>> lots of extra little functions) when this elegantly can be expressed in a
>>> reduce operation with a simple one line lamba function.
>>>
>> I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
>> function.  This probably won't be as fast though because you end up
>> creating objects where as the version I gave will get codgened to operate
>> on binary data the whole way though.
>>
>>> The same applies to these Window functions. I had to read it 3 times to
>>> understand what it all means. Maybe it makes sense for someone who has been
>>> forced to use such limited tools in sql for many years but that's not
>>> necessary what we should aim for. Why can I not just have the sortBy and
>>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>>
>> We also have orderBy and mapPartitions.
>>
>>> All these functions (rank etc.) can be trivially expressed in this, plus
>>> I can add other operations if needed, instead of being locked in like this
>>> Window framework.
>>>
>>  I agree that window functions would probably not be my first choice for
>> many problems, but for people coming from SQL it was a very popular
>> feature.  My real goal is to give as many paradigms as possible in a single
>> unified framework.  Let people pick the right mode of expression for any
>> given job :)
>>
>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
thats an interesting thought about orderBy and mapPartitions. i guess i
could emulate a groupBy with secondary sort using those two. however isn't
using an orderBy expensive since it is a total sort? i mean a groupBy with
secondary sort is also a total sort under the hood, but its on
(hashCode(key), secondarySortColumn) which is easier to distribute and
therefore can be implemented more efficiently.





On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust 
wrote:

> It is still unclear to me why we should remember all these tricks (or add
>> lots of extra little functions) when this elegantly can be expressed in a
>> reduce operation with a simple one line lamba function.
>>
> I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
> function.  This probably won't be as fast though because you end up
> creating objects where as the version I gave will get codgened to operate
> on binary data the whole way though.
>
>> The same applies to these Window functions. I had to read it 3 times to
>> understand what it all means. Maybe it makes sense for someone who has been
>> forced to use such limited tools in sql for many years but that's not
>> necessary what we should aim for. Why can I not just have the sortBy and
>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>
> We also have orderBy and mapPartitions.
>
>> All these functions (rank etc.) can be trivially expressed in this, plus
>> I can add other operations if needed, instead of being locked in like this
>> Window framework.
>>
>  I agree that window functions would probably not be my first choice for
> many problems, but for people coming from SQL it was a very popular
> feature.  My real goal is to give as many paradigms as possible in a single
> unified framework.  Let people pick the right mode of expression for any
> given job :)
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
>
> It is still unclear to me why we should remember all these tricks (or add
> lots of extra little functions) when this elegantly can be expressed in a
> reduce operation with a simple one line lamba function.
>
I think you can do that too.  KeyValueGroupedDataset has a reduceGroups
function.  This probably won't be as fast though because you end up
creating objects where as the version I gave will get codgened to operate
on binary data the whole way though.

> The same applies to these Window functions. I had to read it 3 times to
> understand what it all means. Maybe it makes sense for someone who has been
> forced to use such limited tools in sql for many years but that's not
> necessary what we should aim for. Why can I not just have the sortBy and
> then an Iterator[X] => Iterator[Y] to express what I want to do?
>
We also have orderBy and mapPartitions.

> All these functions (rank etc.) can be trivially expressed in this, plus I
> can add other operations if needed, instead of being locked in like this
> Window framework.
>
 I agree that window functions would probably not be my first choice for
many problems, but for people coming from SQL it was a very popular
feature.  My real goal is to give as many paradigms as possible in a single
unified framework.  Let people pick the right mode of expression for any
given job :)


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
Oh okay that makes sense. The trick is to take max on tuple2 so you carry
the other column along.

It is still unclear to me why we should remember all these tricks (or add
lots of extra little functions) when this elegantly can be expressed in a
reduce operation with a simple one line lamba function.

The same applies to these Window functions. I had to read it 3 times to
understand what it all means. Maybe it makes sense for someone who has been
forced to use such limited tools in sql for many years but that's not
necessary what we should aim for. Why can I not just have the sortBy and
then an Iterator[X] => Iterator[Y] to express what I want to do? All these
functions (rank etc.) can be trivially expressed in this, plus I can add
other operations if needed, instead of being locked in like this Window
framework.

On Nov 3, 2016 4:10 PM, "Michael Armbrust"  wrote:

You are looking to perform an *argmax*, which you can do with a single
aggregation.  Here is an example

.

On Thu, Nov 3, 2016 at 4:53 AM, Rabin Banerjee  wrote:

> Hi All ,
>
>   I want to do a dataframe operation to find the rows having the latest
> timestamp in each group using the below operation
>
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
>
>
> *Spark Version :: 1.6.x*
>
> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>
>
> *I referred a blog here :: 
> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> *
>
> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>
>
> *I need a bit elaboration of how internally spark handles it ? also is it 
> more efficient than using a Window function ?*
>
>
> *Thanks in Advance ,*
>
> *Rabin Banerjee*
>
>
>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Michael Armbrust
You are looking to perform an *argmax*, which you can do with a single
aggregation.  Here is an example

.

On Thu, Nov 3, 2016 at 4:53 AM, Rabin Banerjee  wrote:

> Hi All ,
>
>   I want to do a dataframe operation to find the rows having the latest
> timestamp in each group using the below operation
>
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
>
>
> *Spark Version :: 1.6.x*
>
> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>
>
> *I referred a blog here :: 
> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> *
>
> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>
>
> *I need a bit elaboration of how internally spark handles it ? also is it 
> more efficient than using a Window function ?*
>
>
> *Thanks in Advance ,*
>
> *Rabin Banerjee*
>
>
>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Robin East
I agree with Koert. Relying on something because it appears to work when you 
test it can be dangerous if there is nothing in the api guarantee. 

Going back quite a few years it used to be the case that Oracle would always 
return a group by with the rows in the order of the grouping key. This was the 
result of the implementation specifics of GROUP BY. Then at some point Oracle 
introduce a new hashing GROUP BY mechanism that could be chosen for the cost 
based optimizer and all of a sudden lots of people’s applications ‘broke’ 
because they had been relying on functionality that had always worked in the 
past but wasn’t actually guaranteed.

TLDR - don’t rely on functionality that isn’t specified


> On 3 Nov 2016, at 14:37, Koert Kuipers  wrote:
> 
> i did not check the claim in that blog post that the data is ordered, but i 
> wouldnt rely on that behavior since it is not something the api guarantees 
> and could change in future versions
> 
> On Thu, Nov 3, 2016 at 9:59 AM, Rabin Banerjee  > wrote:
> Hi Koert & Robin ,
> 
>   Thanks ! But if you go through the blog 
> https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> 
>  and check the comments under the blog it's actually working, although I am 
> not sure how . And yes I agree a custom aggregate UDAF is a good option . 
> 
> Can anyone share the best way to implement this in Spark .?
> 
> Regards,
> Rabin Banerjee 
> 
> On Thu, Nov 3, 2016 at 6:59 PM, Koert Kuipers  > wrote:
> Just realized you only want to keep first element. You can do this without 
> sorting by doing something similar to min or max operation using a custom 
> aggregator/udaf or reduceGroups on Dataset. This is also more efficient.
> 
> 
> On Nov 3, 2016 7:53 AM, "Rabin Banerjee"  > wrote:
> Hi All ,
> 
>   I want to do a dataframe operation to find the rows having the latest 
> timestamp in each group using the below operation 
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
> 
> Spark Version :: 1.6.x
> My Question is "Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??
> 
> I referred a blog here :: 
> https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> 
> Which claims it will work except in Spark 1.5.1 and 1.5.2 .
> 
> I need a bit elaboration of how internally spark handles it ? also is it more 
> efficient than using a Window function ?
> 
> Thanks in Advance ,
> Rabin Banerjee
> 
> 
> 
> 



Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
i did not check the claim in that blog post that the data is ordered, but i
wouldnt rely on that behavior since it is not something the api guarantees
and could change in future versions

On Thu, Nov 3, 2016 at 9:59 AM, Rabin Banerjee  wrote:

> Hi Koert & Robin ,
>
> *  Thanks ! *But if you go through the blog https://bzhangusc.
> wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/ and
> check the comments under the blog it's actually working, although I am not
> sure how . And yes I agree a custom aggregate UDAF is a good option .
>
> Can anyone share the best way to implement this in Spark .?
>
> Regards,
> Rabin Banerjee
>
> On Thu, Nov 3, 2016 at 6:59 PM, Koert Kuipers  wrote:
>
>> Just realized you only want to keep first element. You can do this
>> without sorting by doing something similar to min or max operation using a
>> custom aggregator/udaf or reduceGroups on Dataset. This is also more
>> efficient.
>>
>> On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
>> wrote:
>>
>>> Hi All ,
>>>
>>>   I want to do a dataframe operation to find the rows having the latest
>>> timestamp in each group using the below operation
>>>
>>> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
>>> .select("customername","service_type","mobileno","cust_addr")
>>>
>>>
>>> *Spark Version :: 1.6.x*
>>>
>>> My Question is *"Will Spark guarantee the Order while doing the groupBy , 
>>> if DF is ordered using OrderBy previously in Spark 1.6.x"??*
>>>
>>>
>>> *I referred a blog here :: 
>>> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>>>  
>>> *
>>>
>>> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>>>
>>>
>>> *I need a bit elaboration of how internally spark handles it ? also is it 
>>> more efficient than using a Window function ?*
>>>
>>>
>>> *Thanks in Advance ,*
>>>
>>> *Rabin Banerjee*
>>>
>>>
>>>
>>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread ayan guha
I would go for partition by option. It seems simple and yes, SQL inspired
:)
On 4 Nov 2016 00:59, "Rabin Banerjee"  wrote:

> Hi Koert & Robin ,
>
> *  Thanks ! *But if you go through the blog https://bzhangusc.
> wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/ and
> check the comments under the blog it's actually working, although I am not
> sure how . And yes I agree a custom aggregate UDAF is a good option .
>
> Can anyone share the best way to implement this in Spark .?
>
> Regards,
> Rabin Banerjee
>
> On Thu, Nov 3, 2016 at 6:59 PM, Koert Kuipers  wrote:
>
>> Just realized you only want to keep first element. You can do this
>> without sorting by doing something similar to min or max operation using a
>> custom aggregator/udaf or reduceGroups on Dataset. This is also more
>> efficient.
>>
>> On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
>> wrote:
>>
>>> Hi All ,
>>>
>>>   I want to do a dataframe operation to find the rows having the latest
>>> timestamp in each group using the below operation
>>>
>>> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
>>> .select("customername","service_type","mobileno","cust_addr")
>>>
>>>
>>> *Spark Version :: 1.6.x*
>>>
>>> My Question is *"Will Spark guarantee the Order while doing the groupBy , 
>>> if DF is ordered using OrderBy previously in Spark 1.6.x"??*
>>>
>>>
>>> *I referred a blog here :: 
>>> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>>>  
>>> *
>>>
>>> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>>>
>>>
>>> *I need a bit elaboration of how internally spark handles it ? also is it 
>>> more efficient than using a Window function ?*
>>>
>>>
>>> *Thanks in Advance ,*
>>>
>>> *Rabin Banerjee*
>>>
>>>
>>>
>>>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Rabin Banerjee
Hi Koert & Robin ,

*  Thanks ! *But if you go through the blog https://bzhangusc.wordpress.co
m/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/ and check the
comments under the blog it's actually working, although I am not sure how .
And yes I agree a custom aggregate UDAF is a good option .

Can anyone share the best way to implement this in Spark .?

Regards,
Rabin Banerjee

On Thu, Nov 3, 2016 at 6:59 PM, Koert Kuipers  wrote:

> Just realized you only want to keep first element. You can do this without
> sorting by doing something similar to min or max operation using a custom
> aggregator/udaf or reduceGroups on Dataset. This is also more efficient.
>
> On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
> wrote:
>
>> Hi All ,
>>
>>   I want to do a dataframe operation to find the rows having the latest
>> timestamp in each group using the below operation
>>
>> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
>> .select("customername","service_type","mobileno","cust_addr")
>>
>>
>> *Spark Version :: 1.6.x*
>>
>> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
>> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>>
>>
>> *I referred a blog here :: 
>> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>>  
>> *
>>
>> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>>
>>
>> *I need a bit elaboration of how internally spark handles it ? also is it 
>> more efficient than using a Window function ?*
>>
>>
>> *Thanks in Advance ,*
>>
>> *Rabin Banerjee*
>>
>>
>>
>>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
Just realized you only want to keep first element. You can do this without
sorting by doing something similar to min or max operation using a custom
aggregator/udaf or reduceGroups on Dataset. This is also more efficient.

On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
wrote:

> Hi All ,
>
>   I want to do a dataframe operation to find the rows having the latest
> timestamp in each group using the below operation
>
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
>
>
> *Spark Version :: 1.6.x*
>
> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>
>
> *I referred a blog here :: 
> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> *
>
> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>
>
> *I need a bit elaboration of how internally spark handles it ? also is it 
> more efficient than using a Window function ?*
>
>
> *Thanks in Advance ,*
>
> *Rabin Banerjee*
>
>
>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Koert Kuipers
What you require is secondary sort which is not available as such for a
DataFrame. The Window operator is what comes closest but it is strangely
limited in its abilities (probably because it was inspired by a SQL
construct instead of a more generic programmatic transformation capability).

On Nov 3, 2016 7:53 AM, "Rabin Banerjee" 
wrote:

> Hi All ,
>
>   I want to do a dataframe operation to find the rows having the latest
> timestamp in each group using the below operation
>
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
>
>
> *Spark Version :: 1.6.x*
>
> My Question is *"Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??*
>
>
> *I referred a blog here :: 
> **https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> *
>
> *Which claims it will work except in Spark 1.5.1 and 1.5.2 .*
>
>
> *I need a bit elaboration of how internally spark handles it ? also is it 
> more efficient than using a Window function ?*
>
>
> *Thanks in Advance ,*
>
> *Rabin Banerjee*
>
>
>
>


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Robin East
I don’t think the semantics of groupBy necessarily preserve ordering - whatever 
the implementation details or the observed behaviour. I would use a Window 
operation and order within the group.




> On 3 Nov 2016, at 11:53, Rabin Banerjee  wrote:
> 
> Hi All ,
> 
>   I want to do a dataframe operation to find the rows having the latest 
> timestamp in each group using the below operation 
> df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
> .select("customername","service_type","mobileno","cust_addr")
> 
> Spark Version :: 1.6.x
> My Question is "Will Spark guarantee the Order while doing the groupBy , if 
> DF is ordered using OrderBy previously in Spark 1.6.x"??
> 
> I referred a blog here :: 
> https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
>  
> 
> Which claims it will work except in Spark 1.5.1 and 1.5.2 .
> 
> I need a bit elaboration of how internally spark handles it ? also is it more 
> efficient than using a Window function ?
> 
> Thanks in Advance ,
> Rabin Banerjee
> 
> 



Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-03 Thread Rabin Banerjee
Hi All ,

  I want to do a dataframe operation to find the rows having the latest
timestamp in each group using the below operation

df.orderBy(desc("transaction_date")).groupBy("mobileno").agg(first("customername").as("customername"),first("service_type").as("service_type"),first("cust_addr").as("cust_abbr"))
.select("customername","service_type","mobileno","cust_addr")


*Spark Version :: 1.6.x*

My Question is *"Will Spark guarantee the Order while doing the
groupBy , if DF is ordered using OrderBy previously in Spark 1.6.x"??*


*I referred a blog here ::
**https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/
*

*Which claims it will work except in Spark 1.5.1 and 1.5.2 .*


*I need a bit elaboration of how internally spark handles it ? also is
it more efficient than using a Window function ?*


*Thanks in Advance ,*

*Rabin Banerjee*


SparkSQL with Hive got "java.lang.NullPointerException"

2016-11-03 Thread lxw
Hi, exports:
 
   I use SparkSQL to query Hive tables, this query throws NPE, but run OK with 
Hive.


SELECT city
FROM (
  SELECT city 
  FROM t_ad_fact a 
  WHERE a.pt = '2016-10-10' 
  limit 100
) x 
GROUP BY city;



Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:310)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$hiveResultString$3.apply(QueryExecution.scala:129)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$hiveResultString$3.apply(QueryExecution.scala:128)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at 
org.apache.spark.sql.execution.QueryExecution.hiveResultString(QueryExecution.scala:128)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24

Re: LIMIT issue of SparkSQL

2016-10-29 Thread Asher Krim
We have also found LIMIT to take an unacceptable amount of time when
reading parquet formatted data from s3.
LIMIT was not strictly needed for our usecase, so we worked around it

-- 
Asher Krim
Senior Software Engineer

On Fri, Oct 28, 2016 at 5:36 AM, Liz Bai  wrote:

> Sorry for the late reply.
> The size of the raw data is 20G and it is composed of two columns. We
> generated it by this
> 
> .
> The test queries are very simple,
> 1). select ColA from Table limit 1
> 2). select ColA from Table
> 3). select ColA from Table where ColB=0
> 4). select ColA from Table where ColB=0 limit 1
> We found that if we use `result.collect()`, it does early stop upon
> getting adequate results for query 1) and 4).
> However, we used to run `result.write.parquet`, and there is no early stop
> and scans much more data than `result.collect()`.
>
> Below are the detailed testing summary,
> *Query*
> *Method of Saving Results*
> *Run Time*
> select ColA from Table limit 1
> result.write.Parquet
> 1m 56s
> select ColA from Table
> 1m 40s
> select ColA from Table where ColB=0 limit 1
> 1m 32s
> select ColA from Table where ColB=0
> 1m 21s
> select ColA from Table limit 1
> result.collect()
> 18s
> select ColA from Table where ColB=0 limit 1
> 18s
>
> Thanks.
>
> Best,
> Liz
>
> On 27 Oct 2016, at 2:16 AM, Michael Armbrust 
> wrote:
>
> That is surprising then, you may have found a bug.  What timings are you
> seeing?  Can you reproduce it with data you can share? I'd open a JIRA if
> so.
>
> On Tue, Oct 25, 2016 at 4:32 AM, Liz Bai  wrote:
>
>> We used Parquet as data source. The query is like “select ColA from table
>> limit 1”. Attached is the query plan of it. (However its run time is just
>> the same as “select ColA from table”.)
>> We expected an early stop upon getting 1 result, rather than scanning all
>> records and finally collect it with limit in the final phase.
>> Btw, I agree with Mich’s concerning. `Limit push down` is impossible when
>> involving table joins. But some cases such as “Filter + Projection + Limit”
>>  will benefit from `limit push down`.
>> May I know if there is any detailed solutions for this?
>>
>> Thanks so much.
>>
>> Best,
>> Liz
>>
>> 
>>
>> On 25 Oct 2016, at 5:54 AM, Michael Armbrust 
>> wrote:
>>
>> It is not about limits on specific tables.  We do support that.  The case
>> I'm describing involves pushing limits across system boundaries.  It is
>> certainly possible to do this, but the current datasource API does provide
>> this information (other than the implicit limit that is pushed down to the
>> consumed iterator of the data source).
>>
>> On Mon, Oct 24, 2016 at 9:11 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> This is an interesting point.
>>>
>>> As far as I know in any database (practically all RDBMS Oracle, SAP
>>> etc), the LIMIT affects the collection part of the result set.
>>>
>>> The result set is carried out fully on the query that may involve
>>> multiple joins on multiple underlying tables.
>>>
>>> To limit the actual query by LIMIT on each underlying table does not
>>> make sense and will not be industry standard AFAK.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 24 October 2016 at 06:48, Michael Armbrust 
>>> wrote:
>>>
 - dev + user

 Can you give more info about the query?  Maybe a full explain()?  Are
 you using a datasource like JDBC?  The API does not currently push down
 limits, but the documentation talks about how you can use a query instead
 of a table if that is what you are looking to do.

 On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:

> Hi all,
>
> Let me clarify the problem:
>
> Suppose we have a simple table `A` with 100 000 000 records
>
> Problem:
> When we execute sql query ‘select * from A Limit 500`,
> It scan through all 100 000 000 records.
> Normal behaviour should be that once 500 records is found, engine stop
> scanning.
>
> Detailed observation:
> We found that there are “GlobalLimit / LocalLimit” physical operators
> https://github.com/apache/spark/blob/branch-2.0/sql/core/src
> 

Re: LIMIT issue of SparkSQL

2016-10-28 Thread Liz Bai
Sorry for the late reply.
The size of the raw data is 20G and it is composed of two columns. We generated 
it by this 
.
The test queries are very simple,
1). select ColA from Table limit 1
2). select ColA from Table
3). select ColA from Table where ColB=0
4). select ColA from Table where ColB=0 limit 1
We found that if we use `result.collect()`, it does early stop upon getting 
adequate results for query 1) and 4).
However, we used to run `result.write.parquet`, and there is no early stop and 
scans much more data than `result.collect()`.

Below are the detailed testing summary,
Query
Method of Saving Results
Run Time
select ColA from Table limit 1
result.write.Parquet
1m 56s
select ColA from Table
1m 40s
select ColA from Table where ColB=0 limit 1
1m 32s
select ColA from Table where ColB=0 
1m 21s
select ColA from Table limit 1
result.collect()
18s
select ColA from Table where ColB=0 limit 1
18s

Thanks.

Best,
Liz
> On 27 Oct 2016, at 2:16 AM, Michael Armbrust  wrote:
> 
> That is surprising then, you may have found a bug.  What timings are you 
> seeing?  Can you reproduce it with data you can share? I'd open a JIRA if so.
> 
> On Tue, Oct 25, 2016 at 4:32 AM, Liz Bai  > wrote:
> We used Parquet as data source. The query is like “select ColA from table 
> limit 1”. Attached is the query plan of it. (However its run time is just the 
> same as “select ColA from table”.)
> We expected an early stop upon getting 1 result, rather than scanning all 
> records and finally collect it with limit in the final phase. 
> Btw, I agree with Mich’s concerning. `Limit push down` is impossible when 
> involving table joins. But some cases such as “Filter + Projection + Limit”  
> will benefit from `limit push down`.
> May I know if there is any detailed solutions for this?
> 
> Thanks so much.
> 
> Best,
> Liz
> 
> 
>> On 25 Oct 2016, at 5:54 AM, Michael Armbrust > > wrote:
>> 
>> It is not about limits on specific tables.  We do support that.  The case 
>> I'm describing involves pushing limits across system boundaries.  It is 
>> certainly possible to do this, but the current datasource API does provide 
>> this information (other than the implicit limit that is pushed down to the 
>> consumed iterator of the data source).
>> 
>> On Mon, Oct 24, 2016 at 9:11 AM, Mich Talebzadeh > > wrote:
>> This is an interesting point.
>> 
>> As far as I know in any database (practically all RDBMS Oracle, SAP etc), 
>> the LIMIT affects the collection part of the result set.
>> 
>> The result set is carried out fully on the query that may involve multiple 
>> joins on multiple underlying tables.
>> 
>> To limit the actual query by LIMIT on each underlying table does not make 
>> sense and will not be industry standard AFAK.
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>>  
>> http://talebzadehmich.wordpress.com 
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> On 24 October 2016 at 06:48, Michael Armbrust > > wrote:
>> - dev + user
>> 
>> Can you give more info about the query?  Maybe a full explain()?  Are you 
>> using a datasource like JDBC?  The API does not currently push down limits, 
>> but the documentation talks about how you can use a query instead of a table 
>> if that is what you are looking to do.
>> 
>> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai > > wrote:
>> Hi all,
>> 
>> Let me clarify the problem: 
>> 
>> Suppose we have a simple table `A` with 100 000 000 records
>> 
>> Problem:
>> When we execute sql query ‘select * from A Limit 500`,
>> It scan through all 100 000 000 records. 
>> Normal behaviour should be that once 500 records is found, engine stop 
>> scanning.
>> 
>> Detailed observation:
>> We found that there are “GlobalLimit / LocalLimit” physical operators
>> https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
>>  
>> 
>> But during query plan generation, GlobalLimit / LocalLimit is not applied to 
>> the query plan.
>> 
>> Could 

Re: Using Hive UDTF in SparkSQL

2016-10-27 Thread Davies Liu
Could you file a JIRA for this bug?

On Thu, Oct 27, 2016 at 3:05 AM, Lokesh Yadav
 wrote:
> Hello
>
> I am trying to use a Hive UDTF function in spark SQL. But somehow its not
> working for me as intended and I am not able to understand the behavior.
>
> When I try to register a function like this:
> create temporary function SampleUDTF_01 as
> 'com.fl.experiments.sparkHive.SampleUDTF' using JAR
> 'hdfs:///user/root/sparkHive-1.0.0.jar';
> It successfully registers the function, but gives me a 'not a registered
> function' error when I try to run that function. Also it doesn't show up in
> the list when I do a 'show functions'.
>
> Another case:
> When I try to register the same function as a temporary function using a
> local jar (the hdfs path doesn't work with temporary function, that is weird
> too), it registers, and I am able to successfully run that function as well.
> Another weird thing is that I am not able to drop that function using the
> 'drop function ...' statement. This the functions shows up in the function
> registry.
>
> I am stuck with this, any help would be really appreciated.
> Thanks
>
> Regards,
> Lokesh Yadav

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using Hive UDTF in SparkSQL

2016-10-27 Thread Lokesh Yadav
Hello

I am trying to use a Hive UDTF function in spark SQL. But somehow its not
working for me as intended and I am not able to understand the behavior.

When I try to register a function like this:
create temporary function SampleUDTF_01 as
'com.fl.experiments.sparkHive.SampleUDTF' using JAR
'hdfs:///user/root/sparkHive-1.0.0.jar';
It successfully registers the function, but gives me a 'not a registered
function' error when I try to run that function. Also it doesn't show up in
the list when I do a 'show functions'.

Another case:
When I try to register the same function as a temporary function using a
local jar (the hdfs path doesn't work with temporary function, that is
weird too), it registers, and I am able to successfully run that function
as well. Another weird thing is that I am not able to drop that function
using the 'drop function ...' statement. This the functions shows up in the
function registry.

I am stuck with this, any help would be really appreciated.
Thanks

Regards,
Lokesh Yadav


Is there length limit for sparksql/hivesql?

2016-10-26 Thread Jone Zhang
Is there length limit for sparksql/hivesql?
Can antlr work well if sql is too long?

Thanks.


Re: LIMIT issue of SparkSQL

2016-10-24 Thread Michael Armbrust
It is not about limits on specific tables.  We do support that.  The case
I'm describing involves pushing limits across system boundaries.  It is
certainly possible to do this, but the current datasource API does provide
this information (other than the implicit limit that is pushed down to the
consumed iterator of the data source).

On Mon, Oct 24, 2016 at 9:11 AM, Mich Talebzadeh 
wrote:

> This is an interesting point.
>
> As far as I know in any database (practically all RDBMS Oracle, SAP etc),
> the LIMIT affects the collection part of the result set.
>
> The result set is carried out fully on the query that may involve multiple
> joins on multiple underlying tables.
>
> To limit the actual query by LIMIT on each underlying table does not make
> sense and will not be industry standard AFAK.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 24 October 2016 at 06:48, Michael Armbrust 
> wrote:
>
>> - dev + user
>>
>> Can you give more info about the query?  Maybe a full explain()?  Are you
>> using a datasource like JDBC?  The API does not currently push down limits,
>> but the documentation talks about how you can use a query instead of a
>> table if that is what you are looking to do.
>>
>> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:
>>
>>> Hi all,
>>>
>>> Let me clarify the problem:
>>>
>>> Suppose we have a simple table `A` with 100 000 000 records
>>>
>>> Problem:
>>> When we execute sql query ‘select * from A Limit 500`,
>>> It scan through all 100 000 000 records.
>>> Normal behaviour should be that once 500 records is found, engine stop
>>> scanning.
>>>
>>> Detailed observation:
>>> We found that there are “GlobalLimit / LocalLimit” physical operators
>>> https://github.com/apache/spark/blob/branch-2.0/sql/core/src
>>> /main/scala/org/apache/spark/sql/execution/limit.scala
>>> But during query plan generation, GlobalLimit / LocalLimit is not
>>> applied to the query plan.
>>>
>>> Could you please help us to inspect LIMIT problem?
>>> Thanks.
>>>
>>> Best,
>>> Liz
>>>
>>> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>>>
>>> Hi, Liz,
>>>
>>> CollectLimit means `Take the first `limit` elements and collect them to
>>> a single partition.`
>>>
>>> Thanks,
>>>
>>> Xiao
>>>
>>> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>>>
 Hi all,

 I found the runtime for query with or without “LIMIT” keyword is the
 same. We looked into it and found actually there is “GlobalLimit /
 LocalLimit” in logical plan, however no relevant physical plan there. Is
 this a bug or something else? Attached are the logical and physical plans
 when running "SELECT * FROM seq LIMIT 1".


 More specifically, We expected a early stop upon getting adequate
 results.
 Thanks so much.

 Best,
 Liz




 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

>>>
>>>
>>>
>>
>


Re: LIMIT issue of SparkSQL

2016-10-24 Thread Mich Talebzadeh
This is an interesting point.

As far as I know in any database (practically all RDBMS Oracle, SAP etc),
the LIMIT affects the collection part of the result set.

The result set is carried out fully on the query that may involve multiple
joins on multiple underlying tables.

To limit the actual query by LIMIT on each underlying table does not make
sense and will not be industry standard AFAK.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 24 October 2016 at 06:48, Michael Armbrust 
wrote:

> - dev + user
>
> Can you give more info about the query?  Maybe a full explain()?  Are you
> using a datasource like JDBC?  The API does not currently push down limits,
> but the documentation talks about how you can use a query instead of a
> table if that is what you are looking to do.
>
> On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:
>
>> Hi all,
>>
>> Let me clarify the problem:
>>
>> Suppose we have a simple table `A` with 100 000 000 records
>>
>> Problem:
>> When we execute sql query ‘select * from A Limit 500`,
>> It scan through all 100 000 000 records.
>> Normal behaviour should be that once 500 records is found, engine stop
>> scanning.
>>
>> Detailed observation:
>> We found that there are “GlobalLimit / LocalLimit” physical operators
>> https://github.com/apache/spark/blob/branch-2.0/sql/core/
>> src/main/scala/org/apache/spark/sql/execution/limit.scala
>> But during query plan generation, GlobalLimit / LocalLimit is not applied
>> to the query plan.
>>
>> Could you please help us to inspect LIMIT problem?
>> Thanks.
>>
>> Best,
>> Liz
>>
>> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>>
>> Hi, Liz,
>>
>> CollectLimit means `Take the first `limit` elements and collect them to a
>> single partition.`
>>
>> Thanks,
>>
>> Xiao
>>
>> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>>
>>> Hi all,
>>>
>>> I found the runtime for query with or without “LIMIT” keyword is the
>>> same. We looked into it and found actually there is “GlobalLimit /
>>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>>> this a bug or something else? Attached are the logical and physical plans
>>> when running "SELECT * FROM seq LIMIT 1".
>>>
>>>
>>> More specifically, We expected a early stop upon getting adequate
>>> results.
>>> Thanks so much.
>>>
>>> Best,
>>> Liz
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>
>>
>>
>


Re: LIMIT issue of SparkSQL

2016-10-23 Thread Michael Armbrust
- dev + user

Can you give more info about the query?  Maybe a full explain()?  Are you
using a datasource like JDBC?  The API does not currently push down limits,
but the documentation talks about how you can use a query instead of a
table if that is what you are looking to do.

On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:

> Hi all,
>
> Let me clarify the problem:
>
> Suppose we have a simple table `A` with 100 000 000 records
>
> Problem:
> When we execute sql query ‘select * from A Limit 500`,
> It scan through all 100 000 000 records.
> Normal behaviour should be that once 500 records is found, engine stop
> scanning.
>
> Detailed observation:
> We found that there are “GlobalLimit / LocalLimit” physical operators
> https://github.com/apache/spark/blob/branch-2.0/sql/
> core/src/main/scala/org/apache/spark/sql/execution/limit.scala
> But during query plan generation, GlobalLimit / LocalLimit is not applied
> to the query plan.
>
> Could you please help us to inspect LIMIT problem?
> Thanks.
>
> Best,
> Liz
>
> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>
> Hi, Liz,
>
> CollectLimit means `Take the first `limit` elements and collect them to a
> single partition.`
>
> Thanks,
>
> Xiao
>
> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>
>> Hi all,
>>
>> I found the runtime for query with or without “LIMIT” keyword is the
>> same. We looked into it and found actually there is “GlobalLimit /
>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>> this a bug or something else? Attached are the logical and physical plans
>> when running "SELECT * FROM seq LIMIT 1".
>>
>>
>> More specifically, We expected a early stop upon getting adequate results.
>> Thanks so much.
>>
>> Best,
>> Liz
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi Mich,

Even i'm getting similar output.
The dates that are passed as input are different from the one in the output.
Since its an inner join, the expected result is
[2015-12-31,2015-12-31,1,105]
[2016-01-27,2016-01-27,5,101]

Thanks & Regds,
--Praseetha

On Tue, Sep 13, 2016 at 11:21 PM, Mich Talebzadeh  wrote:

> Hi  Praseetha,
>
> This is how I have written this.
>
> case class TestDate (id: String, loginTime: java.sql.Date)
> val formate = new SimpleDateFormat("-MM-DD")
> val TestDateData = sc.parallelize(List(
> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
> ))
>  val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>  val fp = firstPair.toDF
> case class TestDate2 (id2: String, loginTime2: java.sql.Date)
> val TestDateData1 = sc.parallelize(List(
> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
> ))
> val secondPair = TestDateData1.map(x => ( new TestDate2(x._1, x._2)))
> val sp = secondPair.toDF
> val rs = 
> fp.join(sp,fp("loginTime")===sp("loginTime2"),"inner").select('loginTime,'loginTime2,
> 'id,'id2).show
>
> This is what I get
>
> [2015-12-27,2015-12-27,1,101]
> [2015-12-27,2015-12-27,1,102]
> [2015-12-27,2015-12-27,1,103]
> [2015-12-27,2015-12-27,1,104]
> [2015-12-27,2015-12-27,1,105]
> [2015-12-27,2015-12-27,3,101]
> [2015-12-27,2015-12-27,3,102]
> [2015-12-27,2015-12-27,3,103]
> [2015-12-27,2015-12-27,3,104]
> [2015-12-27,2015-12-27,3,105]
> [2015-12-27,2015-12-27,4,101]
> [2015-12-27,2015-12-27,4,102]
> [2015-12-27,2015-12-27,4,103]
> [2015-12-27,2015-12-27,4,104]
> [2015-12-27,2015-12-27,4,105]
> [2015-12-27,2015-12-27,5,101]
> [2015-12-27,2015-12-27,5,102]
> [2015-12-27,2015-12-27,5,103]
> [2015-12-27,2015-12-27,5,104]
> [2015-12-27,2015-12-27,5,105]
> rs: Unit = ()
>
>
> Is this what you are expecting?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 16:46, Praseetha  wrote:
>
>> Hi Mich,
>>
>> val formate = new SimpleDateFormat("-MM-DD")
>>
>> Thanks & Regds,
>> --Praseetha
>>
>> On Tue, Sep 13, 2016 at 8:50 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Praseetha.
>>>
>>> :32: error: not found: value formate
>>> Error occurred in an application involving default arguments.
>>>("1",  new java.sql.Date(formate.parse("2
>>> 016-01-31").getTime)),
>>>
>>> What is that formate?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 September 2016 at 16:12, Praseetha  wrote:
>>>
 Hi Mich,

 Thanks a lot for your reply.

 Here is the sample

 case class TestDate (id: String, loginTime: java.sql.Date)

 val formate = new SimpleDateFormat("-MM-DD")

 val TestDateData = sc.parallelize(List(
 ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
 ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
 ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
 ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
 ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
   

Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Hi  Praseetha,

This is how I have written this.

case class TestDate (id: String, loginTime: java.sql.Date)
val formate = new SimpleDateFormat("-MM-DD")
val TestDateData = sc.parallelize(List(
("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
))
 val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
 val fp = firstPair.toDF
case class TestDate2 (id2: String, loginTime2: java.sql.Date)
val TestDateData1 = sc.parallelize(List(
("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
))
val secondPair = TestDateData1.map(x => ( new TestDate2(x._1, x._2)))
val sp = secondPair.toDF
val rs = fp.join(sp,fp("loginTime")===
sp("loginTime2"),"inner").select('loginTime,'loginTime2, 'id,'id2).show

This is what I get

[2015-12-27,2015-12-27,1,101]
[2015-12-27,2015-12-27,1,102]
[2015-12-27,2015-12-27,1,103]
[2015-12-27,2015-12-27,1,104]
[2015-12-27,2015-12-27,1,105]
[2015-12-27,2015-12-27,3,101]
[2015-12-27,2015-12-27,3,102]
[2015-12-27,2015-12-27,3,103]
[2015-12-27,2015-12-27,3,104]
[2015-12-27,2015-12-27,3,105]
[2015-12-27,2015-12-27,4,101]
[2015-12-27,2015-12-27,4,102]
[2015-12-27,2015-12-27,4,103]
[2015-12-27,2015-12-27,4,104]
[2015-12-27,2015-12-27,4,105]
[2015-12-27,2015-12-27,5,101]
[2015-12-27,2015-12-27,5,102]
[2015-12-27,2015-12-27,5,103]
[2015-12-27,2015-12-27,5,104]
[2015-12-27,2015-12-27,5,105]
rs: Unit = ()


Is this what you are expecting?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 16:46, Praseetha  wrote:

> Hi Mich,
>
> val formate = new SimpleDateFormat("-MM-DD")
>
> Thanks & Regds,
> --Praseetha
>
> On Tue, Sep 13, 2016 at 8:50 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Praseetha.
>>
>> :32: error: not found: value formate
>> Error occurred in an application involving default arguments.
>>("1",  new java.sql.Date(formate.parse("2
>> 016-01-31").getTime)),
>>
>> What is that formate?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 16:12, Praseetha  wrote:
>>
>>> Hi Mich,
>>>
>>> Thanks a lot for your reply.
>>>
>>> Here is the sample
>>>
>>> case class TestDate (id: String, loginTime: java.sql.Date)
>>>
>>> val formate = new SimpleDateFormat("-MM-DD")
>>>
>>> val TestDateData = sc.parallelize(List(
>>> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
>>> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
>>> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
>>> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
>>> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
>>> ))
>>> val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>>>
>>> val TestDateData1 = sc.parallelize(List(
>>> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
>>> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
>>> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
>>> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
>>> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
>>> ))
>>> val 

Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Hi Praseetha.

:32: error: not found: value formate
Error occurred in an application involving default arguments.
   ("1",  new
java.sql.Date(formate.parse("2016-01-31").getTime)),

What is that formate?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 16:12, Praseetha  wrote:

> Hi Mich,
>
> Thanks a lot for your reply.
>
> Here is the sample
>
> case class TestDate (id: String, loginTime: java.sql.Date)
>
> val formate = new SimpleDateFormat("-MM-DD")
>
> val TestDateData = sc.parallelize(List(
> ("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
> ("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
> ("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
> ("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
> ("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
> ))
> val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))
>
> val TestDateData1 = sc.parallelize(List(
> ("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
> ("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
> ("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
> ("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
> ("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
> ))
> val secondPair = TestDateData1.map(x => ( new TestDate(x._1, x._2)))
>
>firstPair.toDF.registerTempTable("firstTable")
>secondPair.toDF.registerTempTable("secondTable")
>
>val res = sqlContext.sql("select * from firstTable INNER JOIN
> secondTable on firstTable.loginTime = secondTable.loginTime")
>
>
> I tried the following query,
> sqlContext.sql("select loginTime from firstTable")
> Even this query gives the wrong dates.
>
> Regds,
> --Praseetha
>
> On Tue, Sep 13, 2016 at 6:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Can you send the rdds that just creates those two dates?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 13:54, Praseetha  wrote:
>>
>>>
>>> Hi All,
>>>
>>> I have a case class in scala case class TestDate (id: String, loginTime:
>>> java.sql.Date)
>>>
>>> I created 2 RDD's of type TestDate
>>>
>>> I wanted to do an inner join on two rdd's where the values of loginTime
>>> column is equal. Please find the code snippet below,
>>>
>>> firstRDD.toDF.registerTempTable("firstTable")
>>> secondRDD.toDF.registerTempTable("secondTable")
>>> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable 
>>> on to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>>>
>>> I'm not getting any exception. But i'm not getting correct answer too.
>>> It does a cartesian and some random dates are generated in the result.
>>>
>>>
>>> Regds,
>>> --Praseetha
>>>
>>
>>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi Mich,

Thanks a lot for your reply.

Here is the sample

case class TestDate (id: String, loginTime: java.sql.Date)

val formate = new SimpleDateFormat("-MM-DD")

val TestDateData = sc.parallelize(List(
("1",  new java.sql.Date(formate.parse("2016-01-31").getTime)),
("2", new java.sql.Date(formate.parse("2106-01-30").getTime)),
("3", new java.sql.Date(formate.parse("2016-01-29").getTime)),
("4", new java.sql.Date(formate.parse("2016-01-28").getTime)),
("5", new java.sql.Date(formate.parse("2016-01-27").getTime))
))
val firstPair = TestDateData.map(x => ( new TestDate(x._1, x._2)))

val TestDateData1 = sc.parallelize(List(
("101", new java.sql.Date(formate.parse("2016-01-27").getTime)),
("102", new java.sql.Date(formate.parse("2016-01-26").getTime)),
("103", new java.sql.Date(formate.parse("2016-01-25").getTime)),
("104", new java.sql.Date(formate.parse("2016-01-24").getTime)),
("105", new java.sql.Date(formate.parse("2016-01-31").getTime))
))
val secondPair = TestDateData1.map(x => ( new TestDate(x._1, x._2)))

   firstPair.toDF.registerTempTable("firstTable")
   secondPair.toDF.registerTempTable("secondTable")

   val res = sqlContext.sql("select * from firstTable INNER JOIN
secondTable on firstTable.loginTime = secondTable.loginTime")


I tried the following query,
sqlContext.sql("select loginTime from firstTable")
Even this query gives the wrong dates.

Regds,
--Praseetha

On Tue, Sep 13, 2016 at 6:33 PM, Mich Talebzadeh 
wrote:

> Can you send the rdds that just creates those two dates?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 September 2016 at 13:54, Praseetha  wrote:
>
>>
>> Hi All,
>>
>> I have a case class in scala case class TestDate (id: String, loginTime:
>> java.sql.Date)
>>
>> I created 2 RDD's of type TestDate
>>
>> I wanted to do an inner join on two rdd's where the values of loginTime
>> column is equal. Please find the code snippet below,
>>
>> firstRDD.toDF.registerTempTable("firstTable")
>> secondRDD.toDF.registerTempTable("secondTable")
>> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on 
>> to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>>
>> I'm not getting any exception. But i'm not getting correct answer too. It
>> does a cartesian and some random dates are generated in the result.
>>
>>
>> Regds,
>> --Praseetha
>>
>
>


Re: Unable to compare SparkSQL Date columns

2016-09-13 Thread Mich Talebzadeh
Can you send the rdds that just creates those two dates?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 September 2016 at 13:54, Praseetha  wrote:

>
> Hi All,
>
> I have a case class in scala case class TestDate (id: String, loginTime:
> java.sql.Date)
>
> I created 2 RDD's of type TestDate
>
> I wanted to do an inner join on two rdd's where the values of loginTime
> column is equal. Please find the code snippet below,
>
> firstRDD.toDF.registerTempTable("firstTable")
> secondRDD.toDF.registerTempTable("secondTable")
> val res = sqlContext.sql("select * from firstTable INNER JOIN secondTable on 
> to_date(firstTable.loginTime) = to_date(secondTable.loginTime)")
>
> I'm not getting any exception. But i'm not getting correct answer too. It
> does a cartesian and some random dates are generated in the result.
>
>
> Regds,
> --Praseetha
>


Unable to compare SparkSQL Date columns

2016-09-13 Thread Praseetha
Hi All,

I have a case class in scala case class TestDate (id: String, loginTime:
java.sql.Date)

I created 2 RDD's of type TestDate

I wanted to do an inner join on two rdd's where the values of loginTime
column is equal. Please find the code snippet below,

firstRDD.toDF.registerTempTable("firstTable")
secondRDD.toDF.registerTempTable("secondTable")
val res = sqlContext.sql("select * from firstTable INNER JOIN
secondTable on to_date(firstTable.loginTime) =
to_date(secondTable.loginTime)")

I'm not getting any exception. But i'm not getting correct answer too. It
does a cartesian and some random dates are generated in the result.


Regds,
--Praseetha


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Mich Talebzadeh
right let us simplify this.

can you run the whole thing *once* only and send dag execution output from
UI?

you can use snipping tool to take the image.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 09:59, Rabin Banerjee 
wrote:

> Hi ,
>
>
>1. You are doing some analytics I guess?  *YES*
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?  *I am Not Looping any
>SQL, All SQLs are called exactly once , which requires output from prev
>SQL.*
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>- n steps, *TRUE, But If I have N SQL and all i th SQL is
>dependent upon i-1 , how spark optimize the memory ? is it like for each i
>th sql it will start execution from STAGE 0 *
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution , IF Spark is not storing anything in
>memory , then when it is executing *i th sql it will start execution
>from STAGE 0 i.e starting from file read *
>5. What happens when you run it only once? How much memory is used
>(look at UI page, 4040 by default) , ? *I checked Spark UI DAG , so
>many file reads , Why ?*
>6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit ? *Driver and executor memory is
>set as 4gb , input data size is less than 1 GB, NO of executor is 5.*
>
> *I am still bit confused about spark's execution plan on multiple SQL with
> only one action .*
>
> *Is it executing each SQL separately and trying to store intermediate
> result in memory which is causing OOM/GC overhead ?*
> *And Still my questions are ...*
>
> *1. Will Spark optimize multiple SQL queries into one single plysical plan
> Which will at least will not execute same stage twice , read file once... ?*
> *2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ? Why in multiple stage Spark is again starting from
> file reading ?*
> *3. Is every SQL will execute and its intermediate result will be stored
> in memory ?*
> *4. What is something that causing OOM and GC overhead here ?*
> *5. What is optimization that could be taken care of ? *
>
>
> On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi
>>
>>1. You are doing some analytics I guess?
>>2. It is almost impossible to guess what is happening except that you
>>are looping 50 times over the same set of sql?
>>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>>-n steps
>>4. you are not storing anything in  memory(no cache, no persist), so
>>all memory is used for the execution
>>5. What happens when you run it only once? How much memory is used
>>(look at UI page, 4040 by default)
>>6.  What Spark mode is being used (Local, Standalone, Yarn)
>>7. OOM could be anything depending on how much you are allocating to
>>your driver memory in spark-submit
>>
>> HTH
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 06:21, Rabin Banerjee <
>> dev.rabin.baner...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>>  I am writing and executing a Spark Batch program which only use
>>> SPARK-SQL , But it is taking lot of time and finally giving GC overhead .
>>>
>>> Here is the program ,
>>>
>>> 1.Read 3 files ,one medium size and 2 small files, and register them as
>>> DF.
>>> 2.
>>>  fire sql with complex aggregation and windowing .
>>>  register result as DF.
>>>
>>> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>>>
>>> 4. All SQLs are sequential , i.e next step requires prev step 

Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Rabin Banerjee
Hi ,


   1. You are doing some analytics I guess?  *YES*
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?  *I am Not Looping any
   SQL, All SQLs are called exactly once , which requires output from prev
   SQL.*
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1
   - n steps, *TRUE, But If I have N SQL and all i th SQL is dependent
   upon i-1 , how spark optimize the memory ? is it like for each i th sql it
   will start execution from STAGE 0 *
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution , IF Spark is not storing anything in
   memory , then when it is executing *i th sql it will start execution
   from STAGE 0 i.e starting from file read *
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default) , ? *I checked Spark UI DAG , so many file
   reads , Why ?*
   6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit ? *Driver and executor memory is set
   as 4gb , input data size is less than 1 GB, NO of executor is 5.*

*I am still bit confused about spark's execution plan on multiple SQL with
only one action .*

*Is it executing each SQL separately and trying to store intermediate
result in memory which is causing OOM/GC overhead ?*
*And Still my questions are ...*

*1. Will Spark optimize multiple SQL queries into one single plysical plan
Which will at least will not execute same stage twice , read file once... ?*
*2. In DAG I can see a lot of file read and lot of stages , Why ? I only
called action once ? Why in multiple stage Spark is again starting from
file reading ?*
*3. Is every SQL will execute and its intermediate result will be stored in
memory ?*
*4. What is something that causing OOM and GC overhead here ?*
*5. What is optimization that could be taken care of ? *


On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh  wrote:

> Hi
>
>1. You are doing some analytics I guess?
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>-n steps
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution
>5. What happens when you run it only once? How much memory is used
>(look at UI page, 4040 by default)
>6.  What Spark mode is being used (Local, Standalone, Yarn)
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit
>
> HTH
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 06:21, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> HI All,
>>
>>  I am writing and executing a Spark Batch program which only use
>> SPARK-SQL , But it is taking lot of time and finally giving GC overhead .
>>
>> Here is the program ,
>>
>> 1.Read 3 files ,one medium size and 2 small files, and register them as
>> DF.
>> 2.
>>  fire sql with complex aggregation and windowing .
>>  register result as DF.
>>
>> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>>
>> 4. All SQLs are sequential , i.e next step requires prev step result .
>>
>> 5. Finally save the final DF .(This is the only action called).
>>
>> Note ::
>>
>> 1. I haven't persists the intermediate DF , as I think Spark will
>> optimize multiple SQL into one physical execution plan .
>> 2. Executor memory and Driver memory is set as 4gb which is too high as
>> data size is in MB.
>>
>> Questions ::
>>
>> 1. Will Spark optimize multiple SQL queries into one single plysical plan
>> ?
>> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
>> called action once ?
>> 3. Is every SQL will execute and its intermediate result will be stored
>> in memory ?
>> 4. What is something that causing OOM and GC overhead here ?
>> 5. What is optimization that could be taken care of ?
>>
>> Spark Version 1.5.x
>>
>>
>> Thanks in advance .
>> Rabin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Mich Talebzadeh
Hi

   1. You are doing some analytics I guess?
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1 -n
   steps
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default)
   6.  What Spark mode is being used (Local, Standalone, Yarn)
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 06:21, Rabin Banerjee 
wrote:

> HI All,
>
>  I am writing and executing a Spark Batch program which only use SPARK-SQL
> , But it is taking lot of time and finally giving GC overhead .
>
> Here is the program ,
>
> 1.Read 3 files ,one medium size and 2 small files, and register them as DF.
> 2.
>  fire sql with complex aggregation and windowing .
>  register result as DF.
>
> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>
> 4. All SQLs are sequential , i.e next step requires prev step result .
>
> 5. Finally save the final DF .(This is the only action called).
>
> Note ::
>
> 1. I haven't persists the intermediate DF , as I think Spark will optimize
> multiple SQL into one physical execution plan .
> 2. Executor memory and Driver memory is set as 4gb which is too high as
> data size is in MB.
>
> Questions ::
>
> 1. Will Spark optimize multiple SQL queries into one single plysical plan ?
> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ?
> 3. Is every SQL will execute and its intermediate result will be stored in
> memory ?
> 4. What is something that causing OOM and GC overhead here ?
> 5. What is optimization that could be taken care of ?
>
> Spark Version 1.5.x
>
>
> Thanks in advance .
> Rabin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-09 Thread Rabin Banerjee
HI All,

 I am writing and executing a Spark Batch program which only use SPARK-SQL
, But it is taking lot of time and finally giving GC overhead .

Here is the program ,

1.Read 3 files ,one medium size and 2 small files, and register them as DF.
2.
 fire sql with complex aggregation and windowing .
 register result as DF.

3.  .Repeat step 2 almost 50 times .so 50 sql .

4. All SQLs are sequential , i.e next step requires prev step result .

5. Finally save the final DF .(This is the only action called).

Note ::

1. I haven't persists the intermediate DF , as I think Spark will optimize
multiple SQL into one physical execution plan .
2. Executor memory and Driver memory is set as 4gb which is too high as
data size is in MB.

Questions ::

1. Will Spark optimize multiple SQL queries into one single plysical plan ?
2. In DAG I can see a lot of file read and lot of stages , Why ? I only
called action once ?
3. Is every SQL will execute and its intermediate result will be stored in
memory ?
4. What is something that causing OOM and GC overhead here ?
5. What is optimization that could be taken care of ?

Spark Version 1.5.x


Thanks in advance .
Rabin


回复:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table

2016-09-05 Thread luohui20001

the data can be written as parquet into HDFS. But the loading data process is 
not working as expected.



 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:<luohui20...@sina.com>
收件人:"user" <user@spark.apache.org>
主题:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL 
table
日期:2016年09月05日 18点55分

hi guys: I got a question that  my SparkStreaming APP can not loading data 
into SparkSQL table in. Here is my code:
val conf = new SparkConf().setAppName("KafkaStreaming for " + 
topics).setMaster("spark://master60:7077")
val storageLevel = StorageLevel.DISK_ONLY
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//Receiver-based 
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
storageLevel)

kafkaStream.foreachRDD { rdd =>
  val x = rdd.count()
  println(s"processing $x records=")
  rdd.collect().foreach(println)
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._
  val logRDD = 
sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
  val logRDD2 = logRDD.map(_.split(',')).map { x =>
NginxLog(x(0).trim().toFloat.toInt,
  x(1).trim(),
  x(2).trim(),
  x(3).trim(),
  x(4).trim(),
  x(5).trim(),
  x(6).trim(),
  x(7).trim(),
  x(8).trim(),
  x(9).trim(),
  x(10).trim())
  }
  val recDF = logRDD2.toDF
  recDF.printSchema()

  val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
  val index = rdd.id
  recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
  hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr 
String,u_domain String,u_url String,u_title String,u_referrer String,u_sh 
String,u_sw String,u_cd String,u_lang String,u_utrace String) STORED AS 
PARQUET")  hc.sql(s"LOAD DATA INPATH 
'/etl/tables/nginxlog/${topicNO}/${index}' INTO TABLE nginxlog")}

There isn't any exception during running my APP. however, except the data in 
the first batch could be loaded into table nginxlog, all other batches can not 
be successfully loaded.I can not understand the reason of this kind of 
behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1






 

ThanksBest regards!
San.Luo


[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table

2016-09-05 Thread luohui20001
hi guys: I got a question that  my SparkStreaming APP can not loading data 
into SparkSQL table in. Here is my code:
val conf = new SparkConf().setAppName("KafkaStreaming for " + 
topics).setMaster("spark://master60:7077")
val storageLevel = StorageLevel.DISK_ONLY
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//Receiver-based 
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
storageLevel)

kafkaStream.foreachRDD { rdd =>
  val x = rdd.count()
  println(s"processing $x records=")
  rdd.collect().foreach(println)
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._
  val logRDD = 
sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
  val logRDD2 = logRDD.map(_.split(',')).map { x =>
NginxLog(x(0).trim().toFloat.toInt,
  x(1).trim(),
  x(2).trim(),
  x(3).trim(),
  x(4).trim(),
  x(5).trim(),
  x(6).trim(),
  x(7).trim(),
  x(8).trim(),
  x(9).trim(),
  x(10).trim())
  }
  val recDF = logRDD2.toDF
  recDF.printSchema()

  val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
  val index = rdd.id
  recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
  hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr 
String,u_domain String,u_url String,u_title String,u_referrer String,u_sh 
String,u_sw String,u_cd String,u_lang String,u_utrace String) STORED AS 
PARQUET")  hc.sql(s"LOAD DATA INPATH 
'/etl/tables/nginxlog/${topicNO}/${index}' INTO TABLE nginxlog")}

There isn't any exception during running my APP. however, except the data in 
the first batch could be loaded into table nginxlog, all other batches can not 
be successfully loaded.I can not understand the reason of this kind of 
behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1






 

ThanksBest regards!
San.Luo


Re: Controlling access to hive/db-tables while using SparkSQL

2016-08-30 Thread ayan guha
Given Record Service is yet to be added to main distributions, I believe
the only available solution now is to use hdfs acl to restrict access for
spark.
On 31 Aug 2016 03:07, "Mich Talebzadeh" <mich.talebza...@gmail.com> wrote:

> Have you checked using views in Hive to restrict user access to certain
> tables and columns only.
>
> Have a look at this link
> <http://blog.cloudera.com/blog/2015/09/recordservice-for-fine-grained-security-enforcement-across-the-hadoop-ecosystem/>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 30 August 2016 at 16:26, Deepak Sharma <deepakmc...@gmail.com> wrote:
>
>> Is it possible to execute any query using SQLContext even if the DB is
>> secured using roles or tools such as Sentry?
>>
>> Thanks
>> Deepak
>>
>> On Tue, Aug 30, 2016 at 7:52 PM, Rajani, Arpan <arpan.raj...@worldpay.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> In our YARN cluster, we have setup spark 1.6.1 , we plan to give access
>>> to all the end users/developers/BI users, etc. But we learnt any valid user
>>> after getting their own user kerb TGT, can get hold of sqlContext (in
>>> program or in shell) and can run any query against any secure databases.
>>>
>>> This puts us in a critical condition as we do not want to give blanket
>>> permission to everyone.
>>>
>>>
>>>
>>> We are looking forward to:
>>>
>>> 1)  A *solution or a work around, by which we can give secure
>>> access only to the selected users to sensitive tables/database.*
>>>
>>> 2)  *Failing to do so, we would like to remove/disable the SparkSQL
>>> context/feature for everyone.  *
>>>
>>>
>>>
>>> Any pointers in this direction will be very valuable.
>>>
>>> Thank you,
>>>
>>> Arpan
>>>
>>>
>>> This e-mail and any attachments are confidential, intended only for the 
>>> addressee and may be privileged. If you have received this e-mail in error, 
>>> please notify the sender immediately and delete it. Any content that does 
>>> not relate to the business of Worldpay is personal to the sender and not 
>>> authorised or endorsed by Worldpay. Worldpay does not accept responsibility 
>>> for viruses or any loss or damage arising from transmission or access.
>>>
>>> Worldpay (UK) Limited (Company No: 07316500/ Financial Conduct Authority 
>>> No: 530923), Worldpay Limited (Company No:03424752 / Financial Conduct 
>>> Authority No: 504504), Worldpay AP Limited (Company No: 05593466 / 
>>> Financial Conduct Authority No: 502597). Registered Office: The Walbrook 
>>> Building, 25 Walbrook, London EC4N 8AF and authorised by the Financial 
>>> Conduct Authority under the Payment Service Regulations 2009 for the 
>>> provision of payment services. Worldpay (UK) Limited is authorised and 
>>> regulated by the Financial Conduct Authority for consumer credit 
>>> activities. Worldpay B.V. (WPBV) has its registered office in Amsterdam, 
>>> the Netherlands (Handelsregister KvK no. 60494344). WPBV holds a licence 
>>> from and is included in the register kept by De Nederlandsche Bank, which 
>>> registration can be consulted through www.dnb.nl. Worldpay, the logo and 
>>> any associated brand names are trade marks of the Worldpay group.
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>


Re: Controlling access to hive/db-tables while using SparkSQL

2016-08-30 Thread Mich Talebzadeh
Have you checked using views in Hive to restrict user access to certain
tables and columns only.

Have a look at this link
<http://blog.cloudera.com/blog/2015/09/recordservice-for-fine-grained-security-enforcement-across-the-hadoop-ecosystem/>

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 30 August 2016 at 16:26, Deepak Sharma <deepakmc...@gmail.com> wrote:

> Is it possible to execute any query using SQLContext even if the DB is
> secured using roles or tools such as Sentry?
>
> Thanks
> Deepak
>
> On Tue, Aug 30, 2016 at 7:52 PM, Rajani, Arpan <arpan.raj...@worldpay.com>
> wrote:
>
>> Hi All,
>>
>> In our YARN cluster, we have setup spark 1.6.1 , we plan to give access
>> to all the end users/developers/BI users, etc. But we learnt any valid user
>> after getting their own user kerb TGT, can get hold of sqlContext (in
>> program or in shell) and can run any query against any secure databases.
>>
>> This puts us in a critical condition as we do not want to give blanket
>> permission to everyone.
>>
>>
>>
>> We are looking forward to:
>>
>> 1)  A *solution or a work around, by which we can give secure access
>> only to the selected users to sensitive tables/database.*
>>
>> 2)  *Failing to do so, we would like to remove/disable the SparkSQL
>> context/feature for everyone.  *
>>
>>
>>
>> Any pointers in this direction will be very valuable.
>>
>> Thank you,
>>
>> Arpan
>>
>>
>> This e-mail and any attachments are confidential, intended only for the 
>> addressee and may be privileged. If you have received this e-mail in error, 
>> please notify the sender immediately and delete it. Any content that does 
>> not relate to the business of Worldpay is personal to the sender and not 
>> authorised or endorsed by Worldpay. Worldpay does not accept responsibility 
>> for viruses or any loss or damage arising from transmission or access.
>>
>> Worldpay (UK) Limited (Company No: 07316500/ Financial Conduct Authority No: 
>> 530923), Worldpay Limited (Company No:03424752 / Financial Conduct Authority 
>> No: 504504), Worldpay AP Limited (Company No: 05593466 / Financial Conduct 
>> Authority No: 502597). Registered Office: The Walbrook Building, 25 
>> Walbrook, London EC4N 8AF and authorised by the Financial Conduct Authority 
>> under the Payment Service Regulations 2009 for the provision of payment 
>> services. Worldpay (UK) Limited is authorised and regulated by the Financial 
>> Conduct Authority for consumer credit activities. Worldpay B.V. (WPBV) has 
>> its registered office in Amsterdam, the Netherlands (Handelsregister KvK no. 
>> 60494344). WPBV holds a licence from and is included in the register kept by 
>> De Nederlandsche Bank, which registration can be consulted through 
>> www.dnb.nl. Worldpay, the logo and any associated brand names are trade 
>> marks of the Worldpay group.
>>
>>
>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Controlling access to hive/db-tables while using SparkSQL

2016-08-30 Thread Deepak Sharma
Is it possible to execute any query using SQLContext even if the DB is
secured using roles or tools such as Sentry?

Thanks
Deepak

On Tue, Aug 30, 2016 at 7:52 PM, Rajani, Arpan <arpan.raj...@worldpay.com>
wrote:

> Hi All,
>
> In our YARN cluster, we have setup spark 1.6.1 , we plan to give access to
> all the end users/developers/BI users, etc. But we learnt any valid user
> after getting their own user kerb TGT, can get hold of sqlContext (in
> program or in shell) and can run any query against any secure databases.
>
> This puts us in a critical condition as we do not want to give blanket
> permission to everyone.
>
>
>
> We are looking forward to:
>
> 1)  A *solution or a work around, by which we can give secure access
> only to the selected users to sensitive tables/database.*
>
> 2)  *Failing to do so, we would like to remove/disable the SparkSQL
> context/feature for everyone.  *
>
>
>
> Any pointers in this direction will be very valuable.
>
> Thank you,
>
> Arpan
>
>
> This e-mail and any attachments are confidential, intended only for the 
> addressee and may be privileged. If you have received this e-mail in error, 
> please notify the sender immediately and delete it. Any content that does not 
> relate to the business of Worldpay is personal to the sender and not 
> authorised or endorsed by Worldpay. Worldpay does not accept responsibility 
> for viruses or any loss or damage arising from transmission or access.
>
> Worldpay (UK) Limited (Company No: 07316500/ Financial Conduct Authority No: 
> 530923), Worldpay Limited (Company No:03424752 / Financial Conduct Authority 
> No: 504504), Worldpay AP Limited (Company No: 05593466 / Financial Conduct 
> Authority No: 502597). Registered Office: The Walbrook Building, 25 Walbrook, 
> London EC4N 8AF and authorised by the Financial Conduct Authority under the 
> Payment Service Regulations 2009 for the provision of payment services. 
> Worldpay (UK) Limited is authorised and regulated by the Financial Conduct 
> Authority for consumer credit activities. Worldpay B.V. (WPBV) has its 
> registered office in Amsterdam, the Netherlands (Handelsregister KvK no. 
> 60494344). WPBV holds a licence from and is included in the register kept by 
> De Nederlandsche Bank, which registration can be consulted through 
> www.dnb.nl. Worldpay, the logo and any associated brand names are trade marks 
> of the Worldpay group.
>
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Controlling access to hive/db-tables while using SparkSQL

2016-08-30 Thread Rajani, Arpan
Hi All,
In our YARN cluster, we have setup spark 1.6.1 , we plan to give access to all 
the end users/developers/BI users, etc. But we learnt any valid user after 
getting their own user kerb TGT, can get hold of sqlContext (in program or in 
shell) and can run any query against any secure databases.
This puts us in a critical condition as we do not want to give blanket 
permission to everyone.

We are looking forward to:

1)  A solution or a work around, by which we can give secure access only to 
the selected users to sensitive tables/database.

2)  Failing to do so, we would like to remove/disable the SparkSQL 
context/feature for everyone.

Any pointers in this direction will be very valuable.
Thank you,
Arpan
This e-mail and any attachments are confidential, intended only for the 
addressee and may be privileged. If you have received this e-mail in error, 
please notify the sender immediately and delete it. Any content that does not 
relate to the business of Worldpay is personal to the sender and not authorised 
or endorsed by Worldpay. Worldpay does not accept responsibility for viruses or 
any loss or damage arising from transmission or access.

Worldpay (UK) Limited (Company No: 07316500/ Financial Conduct Authority No: 
530923), Worldpay Limited (Company No:03424752 / Financial Conduct Authority 
No: 504504), Worldpay AP Limited (Company No: 05593466 / Financial Conduct 
Authority No: 502597). Registered Office: The Walbrook Building, 25 Walbrook, 
London EC4N 8AF and authorised by the Financial Conduct Authority under the 
Payment Service Regulations 2009 for the provision of payment services. 
Worldpay (UK) Limited is authorised and regulated by the Financial Conduct 
Authority for consumer credit activities. Worldpay B.V. (WPBV) has its 
registered office in Amsterdam, the Netherlands (Handelsregister KvK no. 
60494344). WPBV holds a licence from and is included in the register kept by De 
Nederlandsche Bank, which registration can be consulted through www.dnb.nl. 
Worldpay, the logo and any associated brand names are trade marks of the 
Worldpay group.


Re: using matrix as column datatype in SparkSQL Dataframe

2016-08-10 Thread Yanbo Liang
A good way is to implement your own data source to load data of matrix
format. You can refer the LibSVM data format (
https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/ml/source/libsvm)
which contains one column of vector type which is very similar with matrix.

Thanks
Yanbo

2016-08-08 11:06 GMT-07:00 Vadla, Karthik :

> Hello all,
>
>
>
> I'm trying to load set of medical images(dicom) into spark SQL dataframe.
> Here each image is loaded into matrix column of dataframe. I see spark
> recently added MatrixUDT to support this kind of cases, but i don't find a
> sample for using matrix as column in dataframe.
>
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
>
> Can anyone help me with this.
>
> Really appreciate your help.
>
> Thanks
>
> Karthik Vadla
>
>
>


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-09 Thread Zoltan Fedor
Thanks, that makes sense.
So it must be that this queue - which is kept because of the UDF - is the
one running out of memory, because without the UDF field there is no out of
memory error and the UDF fields is pretty small, unlikely that it would
take us above the memory limit.

In either case, thanks for your help, I think I understand it now how the
UDFs and the fields together with the number of rows can result our out of
memory scenario.

On Tue, Aug 9, 2016 at 5:06 PM, Davies Liu <dav...@databricks.com> wrote:

> When you have a Python UDF, only the input to UDF are passed into
> Python process,
> but all other fields that are used together with the result of UDF are
> kept in a queue
> then join with the result from Python. The length of this queue is depend
> on the
> number of rows is under processing by Python (or in the buffer of
> Python process).
> The amount of memory required also depend on how many fields are used in
> the
> results.
>
> On Tue, Aug 9, 2016 at 11:09 AM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> wrote:
> >> Does this mean you only have 1.6G memory for executor (others left for
> >> Python) ?
> >> The cached table could take 1.5G, it means almost nothing left for other
> >> things.
> > True. I have also tried with memoryOverhead being set to 800 (10% of the
> 8Gb
> > memory), but no difference. The "GC overhead limit exceeded" is still the
> > same.
> >
> >> Python UDF do requires some buffering in JVM, the size of buffering
> >> depends on how much rows are under processing by Python process.
> > I did some more testing in the meantime.
> > Leaving the UDFs as-is, but removing some other, static columns from the
> > above SELECT FROM command has stopped the memoryOverhead error from
> > occurring. I have plenty enough memory to store the results with all
> static
> > columns, plus when the UDFs are not there only the rest of the static
> > columns are, then it runs fine. This makes me believe that having UDFs
> and
> > many columns causes the issue together. Maybe when you have UDFs then
> > somehow the memory usage depends on the amount of data in that record
> (the
> > whole row), which includes other fields too, which are actually not used
> by
> > the UDF. Maybe the UDF serialization to Python serializes the whole row
> > instead of just the attributes of the UDF?
> >
> > On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> >> wrote:
> >> > Hi all,
> >> >
> >> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
> >> > 2.0.0
> >> > using pyspark.
> >> >
> >> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into
> 300
> >> > executors's memory in SparkSQL, on which we would do some calculation
> >> > using
> >> > UDFs in pyspark.
> >> > If I run my SQL on only a portion of the data (filtering by one of the
> >> > attributes), let's say 800 million records, then all works well. But
> >> > when I
> >> > run the same SQL on all the data, then I receive
> >> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from
> basically
> >> > all
> >> > of the executors.
> >> >
> >> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> >> > causing this "GC overhead limit being exceeded".
> >> >
> >> > Details:
> >> >
> >> > - using Spark 2.0.0 on a Hadoop YARN cluster
> >> >
> >> > - 300 executors, each with 2 CPU cores and 8Gb memory (
> >> > spark.yarn.executor.memoryOverhead=6400 )
> >>
> >> Does this mean you only have 1.6G memory for executor (others left for
> >> Python) ?
> >> The cached table could take 1.5G, it means almost nothing left for other
> >> things.
> >>
> >> Python UDF do requires some buffering in JVM, the size of buffering
> >> depends on
> >> how much rows are under processing by Python process.
> >>
> >> > - a table of 5.6 Billions rows loaded into the memory of the executors
> >> > (taking up 450Gb of memory), partitioned evenly across the executors
> >> >
> >> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> >> > exceeded' error if running on all records. Running the same on a
> smaller
> >&

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-09 Thread Davies Liu
When you have a Python UDF, only the input to UDF are passed into
Python process,
but all other fields that are used together with the result of UDF are
kept in a queue
then join with the result from Python. The length of this queue is depend on the
number of rows is under processing by Python (or in the buffer of
Python process).
The amount of memory required also depend on how many fields are used in the
results.

On Tue, Aug 9, 2016 at 11:09 AM, Zoltan Fedor <zoltan.1.fe...@gmail.com> wrote:
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
> True. I have also tried with memoryOverhead being set to 800 (10% of the 8Gb
> memory), but no difference. The "GC overhead limit exceeded" is still the
> same.
>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on how much rows are under processing by Python process.
> I did some more testing in the meantime.
> Leaving the UDFs as-is, but removing some other, static columns from the
> above SELECT FROM command has stopped the memoryOverhead error from
> occurring. I have plenty enough memory to store the results with all static
> columns, plus when the UDFs are not there only the rest of the static
> columns are, then it runs fine. This makes me believe that having UDFs and
> many columns causes the issue together. Maybe when you have UDFs then
> somehow the memory usage depends on the amount of data in that record (the
> whole row), which includes other fields too, which are actually not used by
> the UDF. Maybe the UDF serialization to Python serializes the whole row
> instead of just the attributes of the UDF?
>
> On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com> wrote:
>>
>> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
>> > 2.0.0
>> > using pyspark.
>> >
>> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
>> > executors's memory in SparkSQL, on which we would do some calculation
>> > using
>> > UDFs in pyspark.
>> > If I run my SQL on only a portion of the data (filtering by one of the
>> > attributes), let's say 800 million records, then all works well. But
>> > when I
>> > run the same SQL on all the data, then I receive
>> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically
>> > all
>> > of the executors.
>> >
>> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
>> > causing this "GC overhead limit being exceeded".
>> >
>> > Details:
>> >
>> > - using Spark 2.0.0 on a Hadoop YARN cluster
>> >
>> > - 300 executors, each with 2 CPU cores and 8Gb memory (
>> > spark.yarn.executor.memoryOverhead=6400 )
>>
>> Does this mean you only have 1.6G memory for executor (others left for
>> Python) ?
>> The cached table could take 1.5G, it means almost nothing left for other
>> things.
>>
>> Python UDF do requires some buffering in JVM, the size of buffering
>> depends on
>> how much rows are under processing by Python process.
>>
>> > - a table of 5.6 Billions rows loaded into the memory of the executors
>> > (taking up 450Gb of memory), partitioned evenly across the executors
>> >
>> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
>> > exceeded' error if running on all records. Running the same on a smaller
>> > dataset (~800 million rows) does succeed. If no UDF, the query succeed
>> > on
>> > the whole dataset.
>> >
>> > - simplified pyspark code:
>> >
>> > from pyspark.sql.types import StringType
>> >
>> > def test_udf(var):
>> > """test udf that will always return a"""
>> > return "a"
>> > sqlContext.registerFunction("test_udf", test_udf, StringType())
>> >
>> > sqlContext.sql("""CACHE TABLE ma""")
>> >
>> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
>> > test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
>> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
>> > STANDARD_ACCOUNT_CITY_SRC)
>> >  /
>> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-09 Thread Zoltan Fedor
> Does this mean you only have 1.6G memory for executor (others left for
Python) ?
> The cached table could take 1.5G, it means almost nothing left for other
things.
True. I have also tried with memoryOverhead being set to 800 (10% of the
8Gb memory), but no difference. The "GC overhead limit exceeded" is still
the same.

> Python UDF do requires some buffering in JVM, the size of buffering
depends on how much rows are under processing by Python process.
I did some more testing in the meantime.
Leaving the UDFs as-is, but removing some other, static columns from the
above SELECT FROM command has stopped the memoryOverhead error
from occurring. I have plenty enough memory to store the results with all
static columns, plus when the UDFs are not there only the rest of the
static columns are, then it runs fine. This makes me believe that having
UDFs and many columns causes the issue together. Maybe when you have UDFs
then somehow the memory usage depends on the amount of data in that record
(the whole row), which includes other fields too, which are actually not
used by the UDF. Maybe the UDF serialization to Python serializes the whole
row instead of just the attributes of the UDF?

On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com> wrote:

> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> wrote:
> > Hi all,
> >
> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
> 2.0.0
> > using pyspark.
> >
> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> > executors's memory in SparkSQL, on which we would do some calculation
> using
> > UDFs in pyspark.
> > If I run my SQL on only a portion of the data (filtering by one of the
> > attributes), let's say 800 million records, then all works well. But
> when I
> > run the same SQL on all the data, then I receive
> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically
> all
> > of the executors.
> >
> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> > causing this "GC overhead limit being exceeded".
> >
> > Details:
> >
> > - using Spark 2.0.0 on a Hadoop YARN cluster
> >
> > - 300 executors, each with 2 CPU cores and 8Gb memory (
> > spark.yarn.executor.memoryOverhead=6400 )
>
> Does this mean you only have 1.6G memory for executor (others left for
> Python) ?
> The cached table could take 1.5G, it means almost nothing left for other
> things.
>
> Python UDF do requires some buffering in JVM, the size of buffering
> depends on
> how much rows are under processing by Python process.
>
> > - a table of 5.6 Billions rows loaded into the memory of the executors
> > (taking up 450Gb of memory), partitioned evenly across the executors
> >
> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> > exceeded' error if running on all records. Running the same on a smaller
> > dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> > the whole dataset.
> >
> > - simplified pyspark code:
> >
> > from pyspark.sql.types import StringType
> >
> > def test_udf(var):
> > """test udf that will always return a"""
> > return "a"
> > sqlContext.registerFunction("test_udf", test_udf, StringType())
> >
> > sqlContext.sql("""CACHE TABLE ma""")
> >
> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> > test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> > STANDARD_ACCOUNT_CITY_SRC)
> >  /
> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> > (STANDARD_ACCOUNT_CITY_SRC)
> > THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> > ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> >END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> > STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> > FROM ma""")
> >
> > results_df.registerTempTable("m")
> > sqlContext.cacheTable("m")
> >
> > results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> > print(results_df.take(1))
> >
> >
> > - the error thrown on the executors:
> >
> > 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> > writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> > org.apache.spark.sql.catalyst.expre

回复:saving DF to HDFS in parquet format very slow in SparkSQL app

2016-08-09 Thread luohui20001
maybe this problem is not so easy to understand, so I attached my full code.
Hope this could help in solving the problem.



 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:<luohui20...@sina.com>
收件人:"user" <user@spark.apache.org>
主题:saving DF to HDFS in parquet format very slow in SparkSQL app
日期:2016年08月09日 15点34分

hi there:I got a problem in saving a DF to HDFS as parquet format very 
slow. And I attached a pic which shows a lot of time is spent in getting 
result.the code is 
:streamingData.write.mode(SaveMode.Overwrite).parquet("/data/streamingData")
I don't quite understand why my app is so slow in getting the result. I tried 
to access my HDFS while the app is running slow , HDFS is ok.
Any idea will be appreciated.




 

ThanksBest regards!
San.Luo


DataExtractor.scala
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

saving DF to HDFS in parquet format very slow in SparkSQL app

2016-08-09 Thread luohui20001
hi there:I got a problem in saving a DF to HDFS as parquet format very 
slow. And I attached a pic which shows a lot of time is spent in getting 
result.the code is 
:streamingData.write.mode(SaveMode.Overwrite).parquet("/data/streamingData")
I don't quite understand why my app is so slow in getting the result. I tried 
to access my HDFS while the app is running slow , HDFS is ok.
Any idea will be appreciated.




 

ThanksBest regards!
San.Luo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-08 Thread Davies Liu
On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com> wrote:
> Hi all,
>
> I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
> using pyspark.
>
> There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
> executors's memory in SparkSQL, on which we would do some calculation using
> UDFs in pyspark.
> If I run my SQL on only a portion of the data (filtering by one of the
> attributes), let's say 800 million records, then all works well. But when I
> run the same SQL on all the data, then I receive
> "java.lang.OutOfMemoryError: GC overhead limit exceeded" from basically all
> of the executors.
>
> It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> causing this "GC overhead limit being exceeded".
>
> Details:
>
> - using Spark 2.0.0 on a Hadoop YARN cluster
>
> - 300 executors, each with 2 CPU cores and 8Gb memory (
> spark.yarn.executor.memoryOverhead=6400 )

Does this mean you only have 1.6G memory for executor (others left for Python) ?
The cached table could take 1.5G, it means almost nothing left for other things.

Python UDF do requires some buffering in JVM, the size of buffering depends on
how much rows are under processing by Python process.

> - a table of 5.6 Billions rows loaded into the memory of the executors
> (taking up 450Gb of memory), partitioned evenly across the executors
>
> - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> exceeded' error if running on all records. Running the same on a smaller
> dataset (~800 million rows) does succeed. If no UDF, the query succeed on
> the whole dataset.
>
> - simplified pyspark code:
>
> from pyspark.sql.types import StringType
>
> def test_udf(var):
> """test udf that will always return a"""
> return "a"
> sqlContext.registerFunction("test_udf", test_udf, StringType())
>
> sqlContext.sql("""CACHE TABLE ma""")
>
> results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> STANDARD_ACCOUNT_CITY_SRC)
>  /
> CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> (STANDARD_ACCOUNT_CITY_SRC)
> THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
>END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> FROM ma""")
>
> results_df.registerTempTable("m")
> sqlContext.cacheTable("m")
>
> results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> print(results_df.take(1))
>
>
> - the error thrown on the executors:
>
> 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
> writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
> at
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
> at
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> SIGNAL TERM
>
>
> Has anybody experienced these "GC overhead limit exceeded" errors with
> pyspark UDFs before?
>
> Thanks,
> Zoltan
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



java.lang.OutOfMemoryError: GC overhead limit exceeded when using UDFs in SparkSQL (Spark 2.0.0)

2016-08-08 Thread Zoltan Fedor
Hi all,

I have an interesting issue trying to use UDFs from SparkSQL in Spark 2.0.0
using pyspark.

There is a big table (5.6 Billion rows, 450Gb in memory) loaded into 300
executors's memory in SparkSQL, on which we would do some calculation using
UDFs in pyspark.
If I run my SQL on only a portion of the data (filtering by one of the
attributes), let's say 800 million records, then all works well. But when I
run the same SQL on all the data, then I receive "*java.lang.OutOfMemoryError:
GC overhead limit exceeded"* from basically all of the executors.

It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
causing this "GC overhead limit being exceeded".

Details:

- using Spark 2.0.0 on a Hadoop YARN cluster

- 300 executors, each with 2 CPU cores and 8Gb memory (
spark.yarn.executor.memoryOverhead=6400 )

- a table of 5.6 Billions rows loaded into the memory of the executors
(taking up 450Gb of memory), partitioned evenly across the executors

- creating even the simplest UDF in SparkSQL causes 'GC overhead limit
exceeded' error if running on all records. Running the same on a smaller
dataset (~800 million rows) does succeed. If no UDF, the query succeed on
the whole dataset.

- simplified pyspark code:

*from pyspark.sql.types import StringType*

*def test_udf(var):*
*"""test udf that will always return a"""*
*return "a"*
*sqlContext.registerFunction("test_udf", test_udf, StringType())*

*sqlContext.sql("""CACHE TABLE ma""")*

*results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,*
*test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,*
* ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
STANDARD_ACCOUNT_CITY_SRC) *
*  / *
* CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
(STANDARD_ACCOUNT_CITY_SRC)*
* THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
* ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)*
*END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,*
* STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV*
* FROM ma""")*

*results_df.registerTempTable("m")*
*sqlContext.cacheTable("m")*

*results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")*
*print(results_df.take(1))*


- the error thrown on the executors:

*16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread stdout
writer for /hadoop/cloudera/parcels/Anaconda/bin/python*
*java.lang.OutOfMemoryError: GC overhead limit exceeded*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)*
* at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at
org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$1.apply(BatchEvalPythonExec.scala:64)*
* at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)*
* at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1076)*
* at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)*
* at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129)*
* at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)*
* at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:893)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)*
* at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)*
* at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)*
* at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)*
*16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
SIGNAL TERM*


Has anybody experienced these "*GC overhead limit exceeded*" errors with
pyspark UDFs before?

Thanks,
Zoltan


using matrix as column datatype in SparkSQL Dataframe

2016-08-08 Thread Vadla, Karthik
Hello all,


I'm trying to load set of medical images(dicom) into spark SQL dataframe. Here 
each image is loaded into matrix column of dataframe. I see spark recently 
added MatrixUDT to support this kind of cases, but i don't find a sample for 
using matrix as column in dataframe.

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala

Can anyone help me with this.

Really appreciate your help.

Thanks

Karthik Vadla



How to avoid sql injection on SparkSQL?

2016-08-04 Thread Linyuxin
Hi All,
I want to know how to avoid sql injection on SparkSQL
Is there any common pattern about this?
e.g. some useful tool or code segment

or just create a “wheel” on SparkSQL myself.

Thanks.


Re: SPARKSQL with HiveContext My job fails

2016-08-04 Thread Mich Talebzadeh
Well the error states


Exception in thread thread_name: java.lang.OutOfMemoryError: GC Overhead
limit exceeded

Cause: The detail message "GC overhead limit exceeded" indicates that the
garbage collector is running all the time and Java program is making very
slow progress. After a garbage collection, if the Java process is spending
more than approximately 98% of its time doing garbage collection and if it
is recovering less than 2% of the heap and has been doing so far the last 5
(compile time constant) consecutive garbage collections, then a
java.lang.OutOfMemoryError is thrown. This exception is typically thrown
because the amount of live data barely fits into the Java heap having
little free space for new allocations.
Action: Increase the heap size. The java.lang.OutOfMemoryError exception
for *GC Overhead limit exceeded* can be turned off with the command line
flag -XX:-UseGCOverheadLimit.

We still don't know what the code is doing. You have not provided that
info. Are you running Spark on Yarn?. Have you checked yarn logs?


HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 4 August 2016 at 10:49, Vasu Devan  wrote:

> Hi Team,
>
> My Spark job fails with below error :
>
> Could you please advice me what is the problem with my job.
>
> Below is my error stack:
>
> 16/08/04 05:11:06 ERROR ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem
> [sparkDriver]
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
> at
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:388)
> at
> sun.reflect.MethodAccessorGenerator.generateMethod(MethodAccessorGenerator.java:77)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:46)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
> at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
> at scala.util.Try$.apply(Try.scala:161)
> 16/08/04 05:11:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 16/08/04 05:11:07 INFO 

SPARKSQL with HiveContext My job fails

2016-08-04 Thread Vasu Devan
Hi Team,

My Spark job fails with below error :

Could you please advice me what is the problem with my job.

Below is my error stack:

16/08/04 05:11:06 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
at
sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:388)
at
sun.reflect.MethodAccessorGenerator.generateMethod(MethodAccessorGenerator.java:77)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:46)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
16/08/04 05:11:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
16/08/04 05:11:07 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
16/08/04 05:11:07 INFO TaskSetManager: Finished task 18540.0 in stage 148.0
(TID 153058) in 190291 ms on lhrrhegapq005.enterprisenet.org (18536/32768)
16/08/04 05:11:07 INFO TaskSetManager: Finished task 18529.0 in stage 148.0
(TID 153044) in 190300 ms on lhrrhegapq008.enterprisenet.org (18537/32768)
16/08/04 05:11:07 INFO TaskSetManager: Finished task 18530.0 in stage 148.0
(TID 153049) in 190297 ms on lhrrhegapq005.enterprisenet.org (18538/32768)
16/08/04 05:11:07 INFO TaskSetManager: Finished task 18541.0 in stage 148.0
(TID 153062) in 190291 ms on lhrrhegapq006.enterprisenet.org (18539/32768)
16/08/04 05:11:09 INFO TaskSetManager: Finished task 18537.0 in stage 148.0
(TID 153057) in 191648 ms on lhrrhegapq003.enterprisenet.org (18540/32768)
16/08/04 05:11:10 INFO TaskSetManager: Finished task 18557.0 in stage 148.0
(TID 153073) in 193193 ms on lhrrhegapq003.enterprisenet.org (18541/32768)
16/08/04 05:11:10 INFO TaskSetManager: Finished task 18528.0 in stage 148.0
(TID 153045) in 193206 ms on lhrrhegapq007.enterprisenet.org (18542/32768)
16/08/04 05:11:10 INFO TaskSetManager: Finished task 18555.0 in stage 148.0
(TID 153072) in 193195 ms on lhrrhegapq002.enterprisenet.org (18543/32768)
16/08/04 05:11:10 ERROR YarnClientSchedulerBackend: Yarn application has
already exited with state FINISHED!
16/08/04 05:11:13 WARN QueuedThreadPool: 9 threads could not be stopped
16/08/04 05:11:13 INFO SparkUI: Stopped Spark web UI at
http://10.90.50.64:4043
16/08/04 05:11:15 INFO DAGScheduler: Stopping DAGScheduler
16/08/04 05:11:16 INFO DAGScheduler: Job 94 failed: save at
ndx_scala_util.scala:1264, took 232.788303 s
16/08/04 05:11:16 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
at

Re: Any reference of performance tuning on SparkSQL?

2016-07-28 Thread Sonal Goyal
I found some references at

http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-td21871.html

HTH

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Reifier at Strata Hadoop World <https://www.youtube.com/watch?v=eD3LkpPQIgM>
Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Thu, Jul 28, 2016 at 12:40 PM, Linyuxin <linyu...@huawei.com> wrote:

> Hi ALL
>
>  Is there any  reference of performance tuning on SparkSQL?
>
> I can only find about turning on spark core on http://spark.apache.org/
>


Any reference of performance tuning on SparkSQL?

2016-07-28 Thread Linyuxin
Hi ALL
 Is there any  reference of performance tuning on SparkSQL?
I can only find about turning on spark core on http://spark.apache.org/


WrappedArray in SparkSQL DF

2016-07-22 Thread KhajaAsmath Mohammed
Hi,

I am reading JSON file and I am facing difficulties trying to get
individula elements for this array. does anyone know how to get the
elements from WrappedArray(WrappedArray(String))

Schema:
++
|rows|
++
|[WrappedArray(Bon...|
++



WrappedArray(WrappedArray(Bondnotnotseven, 146517120, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0), WrappedArray(Bondnotnotseven, 146577600, 0.0,
0.0, 2.0, 2.0, 0.0, 0.0, 2.0, 2.0), WrappedArray(Bondnotnotseven,
146638080, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 10.0, 8.0),
WrappedArray(Bondnotnotseven, 146698560, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0,
20.0, 10.0))

Thanks,
Asmath


Re: Where is the SparkSQL Specification?

2016-07-21 Thread Mich Talebzadeh
Spark SQL is a subset of Hive SQL which  by and large supports ANSI 92 SQL
including search parameters like above

scala> sqlContext.sql("select count(1) from oraclehadoop.channels where
channel_desc like ' %b_xx%'").show
+---+
|_c0|
+---+
|  0|
+---+

So check Hive QL Language support
HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 July 2016 at 08:20, Linyuxin  wrote:

> Hi All
>
> Newbee here.
>
> My spark version is 1.5.1
>
>
>
> And I want to know how can I find the Specification of Spark SQL to find
> out that if it is supported ‘a like %b_xx’ or other sql syntax
>


Where is the SparkSQL Specification?

2016-07-21 Thread Linyuxin
Hi All
Newbee here.
My spark version is 1.5.1

And I want to know how can I find the Specification of Spark SQL to find out 
that if it is supported ‘a like %b_xx’ or other sql syntax


Re: How to Register Permanent User-Defined-Functions (UDFs) in SparkSQL

2016-07-12 Thread Daniel Darabos
Hi Lokesh,
There is no way to do that. SqlContext.newSession documentation says:

Returns a SQLContext as new session, with separated SQL configurations,
temporary tables, registered functions, but sharing the same SparkContext,
CacheManager, SQLListener and SQLTab.

You have two options: either use the same SQLContext instead of creating
new SQLContexts, or have a function for creating SQLContexts, and this
function can also register the UDFs in every created SQLContext.

On Sun, Jul 10, 2016 at 6:14 PM, Lokesh Yadav <lokeshyadav.c...@gmail.com>
wrote:

> Hi
> with sqlContext we can register a UDF like
> this: sqlContext.udf.register("sample_fn", sample_fn _ )
> But this UDF is limited to that particular sqlContext only. I wish to make
> the registration persistent, so that I can access the same UDF in any
> subsequent sqlcontext.
> Or is there any other way to register UDFs in sparkSQL so that they remain
> persistent?
>
> Regards
> Lokesh
>


How to Register Permanent User-Defined-Functions (UDFs) in SparkSQL

2016-07-10 Thread Lokesh Yadav
Hi
with sqlContext we can register a UDF like
this: sqlContext.udf.register("sample_fn", sample_fn _ )
But this UDF is limited to that particular sqlContext only. I wish to make
the registration persistent, so that I can access the same UDF in any
subsequent sqlcontext.
Or is there any other way to register UDFs in sparkSQL so that they remain
persistent?

Regards
Lokesh


SparkSQL Added file get Exception: is a directory and recursive is not turned on

2016-07-07 Thread linxi zeng
Hi, all:
   As recorded in https://issues.apache.org/jira/browse/SPARK-16408, when
using Spark-sql to execute sql like:
   add file hdfs://xxx/user/test;
   If the HDFS path( hdfs://xxx/user/test) is a directory, then we will get
an exception like:

org.apache.spark.SparkException: Added file hdfs://xxx/user/test is a
directory and recursive is not turned on.
   at org.apache.spark.SparkContext.addFile(SparkContext.scala:1372)
   at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340)
   at
org.apache.spark.sql.hive.execution.AddFile.run(commands.scala:117)
   at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
   at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
   at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)


   I think we should add an parameter (spark.input.dir.recursive) to
control the value of recursive, and make this parameter works by modify
some code, like:

diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6b16d59..3be8553 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -113,8 +113,9 @@ case class AddFile(path: String) extends
RunnableCommand {

   override def run(sqlContext: SQLContext): Seq[Row] = {
 val hiveContext = sqlContext.asInstanceOf[HiveContext]
+val recursive =
sqlContext.sparkContext.getConf.getBoolean("spark.input.dir.recursive",
false)
 hiveContext.runSqlHive(s"ADD FILE $path")
-hiveContext.sparkContext.addFile(path)
+hiveContext.sparkContext.addFile(path, recursive)
 Seq.empty[Row]
   }
 }


Re: SparkSQL issue: Spark 1.3.1 + hadoop 2.6 on CDH5.3 with parquet

2016-06-20 Thread Satya
Hello,
We are also experiencing the same error.  Can you please provide the steps
that resolved the issue.
Thanks
Satya



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-issue-Spark-1-3-1-hadoop-2-6-on-CDH5-3-with-parquet-tp22808p27197.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSql Catalyst extending Analyzer, Error with CatalystConf

2016-05-13 Thread sib
Hello,

I am trying to write a basic analyzer, by extending the catalyst analyzer
with a few extra rules.

I am getting the following error:
*""" trait CatalystConf in package catalyst cannot be accessed in package
org.apache.spark.sql.catalyst """*


In my attempt I am doing the following:

class CustomSQLContext(sc: SparkContext) extends SQLContext(sc) {
val an = new CustomAnalyzer(Map("testRule" ->
testRule),catalog,functionRegistry,conf)
override lazy val analyzer: Analyzer = an
  }

class CustomAnalyzer(rules: Map[String, Rule[LogicalPlan]], catalog:
Catalog, registery: FunctionRegistry, conf: CatalystConf )
extends Analyzer( catalog, registery, conf) {
..
override lazy val batches = my_batch.toSeq ++ default_batches ++ Nil
  }

Any ideas how I can pass the conf to the customAnalyzer without this error?

I tried passing it as SQLConf but get a not found error and importing
doesn't seem to work.


Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSql-Catalyst-extending-Analyzer-Error-with-CatalystConf-tp26950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >