Re: SparkML algos limitations question.

2015-12-27 Thread Yanbo Liang
Hi Eugene,

AFAIK, the current implementation of MultilayerPerceptronClassifier have
some scalability problems if the model is very huge (such as >10M),
although I think the limitation can cover many use cases already.

Yanbo

2015-12-16 6:00 GMT+08:00 Joseph Bradley :

> Hi Eugene,
>
> The maxDepth parameter exists because the implementation uses Integer node
> IDs which correspond to positions in the binary tree.  This simplified the
> implementation.  I'd like to eventually modify it to avoid depending on
> tree node IDs, but that is not yet on the roadmap.
>
> There is not an analogous limit for the GLMs you listed, but I'm not very
> familiar with the perceptron implementation.
>
> Joseph
>
> On Mon, Dec 14, 2015 at 10:52 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm currently working on POC and try to use Random Forest (classification
>> and regression). I also have to check SVM and Multiclass perceptron (other
>> algos are less important at the moment). So far I've discovered that Random
>> Forest has a limitation of maxDepth for trees and just out of curiosity I
>> wonder why such a limitation has been introduced?
>>
>> An actual question is that I'm going to use Spark ML in production next
>> year and would like to know if there are other limitations like maxDepth in
>> RF for other algorithms: Logistic Regression, Perceptron, SVM, etc.
>>
>> Thanks in advance for your time.
>> --
>> Be well!
>> Jean Morozov
>>
>
>


Re: partitioning json data in spark

2015-12-27 Thread Igor Berman
have you tried to specify format of your output, might be parquet is
default format?
df.write().format("json").mode(SaveMode.Overwrite).save("/tmp/path");

On 27 December 2015 at 15:18, Նարեկ Գալստեան  wrote:

> Hey all!
> I am willing to partition *json *data by a column name and store the
> result as a collection of json files to be loaded to another database.
>
> I could use spark's built in *partitonBy *function but it only outputs in
> parquet format which is not desirable for me.
>
> Could you suggest me a way to deal with this problem?
> Narek Galstyan
>
> Նարեկ Գալստյան
>


Re: Pattern type is incompatible with expected type

2015-12-27 Thread Ted Yu
Have you tried declaring RDD[ChildTypeOne] and writing separate functions
for each sub-type ?

Cheers

On Sun, Dec 27, 2015 at 10:08 AM, pkhamutou  wrote:

> Hello,
>
> I have a such situation:
>
> abstract class SuperType {...}
> case class ChildTypeOne(x: String) extends SuperType {.}
> case class ChildTypeTwo(x: String) extends SuperType {}
>
> than I have:
>
> val rdd1: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeOne(r))
> val rdd2: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeTwo(r))
>
> but when i try to:
> def someFunction(rdd: RDD[SuperType]) = rdd match {
>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>   case rdd: RDD[ChildTypeTwo] => println("ChildTypeTwo")
> }
>
>
> i get:
>
> Error:(60, 15) pattern type is incompatible with expected type;
>  found   : org.apache.spark.rdd.RDD[ChildTypeOne]
>  required: org.apache.spark.rdd.RDD[SuperType]
> Note: ChildTypeOne <: SuperType, but class RDD is invariant in type T.
> You may wish to define T as +T instead. (SLS 4.5)
>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>   ^
>
> So how to work around it? Because in some situations I need to distinguish
> them.
>
> Best regards,
> Pavel Khamutou
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pattern-type-is-incompatible-with-expected-type-tp25805.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Passing parameters to spark SQL

2015-12-27 Thread Ajaxx
Given a SQLContext (or HiveContext) is it possible to pass in parameters to a
query.  There are several reasons why this makes sense, including loss of
data type during conversion to string, SQL injection, etc.

But currently, it appears that SQLContext.sql() only takes a single
parameter which is a string.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pattern type is incompatible with expected type

2015-12-27 Thread pkhamutou
Hello,

I have a such situation:

abstract class SuperType {...}
case class ChildTypeOne(x: String) extends SuperType {.}
case class ChildTypeTwo(x: String) extends SuperType {}

than I have:

val rdd1: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeOne(r))
val rdd2: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeTwo(r))

but when i try to:
def someFunction(rdd: RDD[SuperType]) = rdd match {
  case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
  case rdd: RDD[ChildTypeTwo] => println("ChildTypeTwo")
}


i get:

Error:(60, 15) pattern type is incompatible with expected type;
 found   : org.apache.spark.rdd.RDD[ChildTypeOne]
 required: org.apache.spark.rdd.RDD[SuperType]
Note: ChildTypeOne <: SuperType, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
  case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
  ^
 
So how to work around it? Because in some situations I need to distinguish
them.

Best regards,
Pavel Khamutou



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pattern-type-is-incompatible-with-expected-type-tp25805.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: partitioning json data in spark

2015-12-27 Thread Ted Yu
Is upgrading to 1.5.x a possibility for you ?

Cheers

On Sun, Dec 27, 2015 at 9:28 AM, Նարեկ Գալստեան 
wrote:

>
> http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>  I did try but it all was in vain.
> It is also explicitly written in api docs that it only supports Parquet.
>
> ​
>
> Narek Galstyan
>
> Նարեկ Գալստյան
>
> On 27 December 2015 at 17:52, Igor Berman  wrote:
>
>> have you tried to specify format of your output, might be parquet is
>> default format?
>> df.write().format("json").mode(SaveMode.Overwrite).save("/tmp/path");
>>
>> On 27 December 2015 at 15:18, Նարեկ Գալստեան 
>> wrote:
>>
>>> Hey all!
>>> I am willing to partition *json *data by a column name and store the
>>> result as a collection of json files to be loaded to another database.
>>>
>>> I could use spark's built in *partitonBy *function but it only outputs
>>> in parquet format which is not desirable for me.
>>>
>>> Could you suggest me a way to deal with this problem?
>>> Narek Galstyan
>>>
>>> Նարեկ Գալստյան
>>>
>>
>>
>


Re: partitioning json data in spark

2015-12-27 Thread Նարեկ Գալստեան
http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
 I did try but it all was in vain.
It is also explicitly written in api docs that it only supports Parquet.

​

Narek Galstyan

Նարեկ Գալստյան

On 27 December 2015 at 17:52, Igor Berman  wrote:

> have you tried to specify format of your output, might be parquet is
> default format?
> df.write().format("json").mode(SaveMode.Overwrite).save("/tmp/path");
>
> On 27 December 2015 at 15:18, Նարեկ Գալստեան  wrote:
>
>> Hey all!
>> I am willing to partition *json *data by a column name and store the
>> result as a collection of json files to be loaded to another database.
>>
>> I could use spark's built in *partitonBy *function but it only outputs
>> in parquet format which is not desirable for me.
>>
>> Could you suggest me a way to deal with this problem?
>> Narek Galstyan
>>
>> Նարեկ Գալստյան
>>
>
>


Re: Pattern type is incompatible with expected type

2015-12-27 Thread Pavel Khamutou
But the idea is to keep at as RDD[SuperType] since i have
implicit contention to add custom functionality to RDD. Like here:
http://blog.madhukaraphatak.com/extending-spark-api/

Cheers

On 27 December 2015 at 19:13, Ted Yu  wrote:

> Have you tried declaring RDD[ChildTypeOne] and writing separate functions
> for each sub-type ?
>
> Cheers
>
> On Sun, Dec 27, 2015 at 10:08 AM, pkhamutou  wrote:
>
>> Hello,
>>
>> I have a such situation:
>>
>> abstract class SuperType {...}
>> case class ChildTypeOne(x: String) extends SuperType {.}
>> case class ChildTypeTwo(x: String) extends SuperType {}
>>
>> than I have:
>>
>> val rdd1: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeOne(r))
>> val rdd2: RDD[SuperType] = sc./*some code*/.map(r => ChildTypeTwo(r))
>>
>> but when i try to:
>> def someFunction(rdd: RDD[SuperType]) = rdd match {
>>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>>   case rdd: RDD[ChildTypeTwo] => println("ChildTypeTwo")
>> }
>>
>>
>> i get:
>>
>> Error:(60, 15) pattern type is incompatible with expected type;
>>  found   : org.apache.spark.rdd.RDD[ChildTypeOne]
>>  required: org.apache.spark.rdd.RDD[SuperType]
>> Note: ChildTypeOne <: SuperType, but class RDD is invariant in type T.
>> You may wish to define T as +T instead. (SLS 4.5)
>>   case rdd: RDD[ChildTypeOne] => println("ChildTypeOne")
>>   ^
>>
>> So how to work around it? Because in some situations I need to distinguish
>> them.
>>
>> Best regards,
>> Pavel Khamutou
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Pattern-type-is-incompatible-with-expected-type-tp25805.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Passing parameters to spark SQL

2015-12-27 Thread Jeff Zhang
You can do it using scala string interpolation

http://docs.scala-lang.org/overviews/core/string-interpolation.html

On Mon, Dec 28, 2015 at 5:11 AM, Ajaxx  wrote:

> Given a SQLContext (or HiveContext) is it possible to pass in parameters
> to a
> query.  There are several reasons why this makes sense, including loss of
> data type during conversion to string, SQL injection, etc.
>
> But currently, it appears that SQLContext.sql() only takes a single
> parameter which is a string.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: How to contribute by picking up starter bugs

2015-12-27 Thread lokeshkumar
Thanks a lot Jim, looking to forward to pick up some bugs.

On Mon, Dec 28, 2015 at 8:42 AM, jiml [via Apache Spark User List] <
ml-node+s1001560n25813...@n3.nabble.com> wrote:

> You probably want to start on the dev list:
> http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> I have seen bugs where someone does just what you suggest, great volunteer
> spirit! I think you have the right idea, just jump in there, create the
> pull request and ask them to assign the bug to you :)
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-contribute-by-picking-up-starter-bugs-tp25795p25813.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-contribute-by-picking-up-starter-bugs-tp25795p25815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
yes
Sharing the execution flow

15/12/28 00:19:15 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql.hive.orc._

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
15/12/28 00:20:15 WARN SparkConf: The configuration key
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
1.3 and and may be removed in the future. Please use the new key
'spark.yarn.am.waitTime' instead.
15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version
0.13.1
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@9046f81

scala> import org.apache.spark.sql.types.{StructType, StructField,
StringType, IntegerType,FloatType ,LongType ,TimestampType };
import org.apache.spark.sql.types.{StructType, StructField, StringType,
IntegerType, FloatType, LongType, TimestampType}

scala> val carsSchema = StructType(Seq(StructField("year", IntegerType,
true),StructField("make", StringType, true),StructField("model",
StringType, true),StructField("comment", StringType,
true),StructField("blank", StringType, true)))
carsSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(year,IntegerType,true),
StructField(make,StringType,true), StructField(model,StringType,true),
StructField(comment,StringType,true), StructField(blank,StringType,true))

scala> val carsdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"true").schema(carsSchema).load("/tmp/TestDivya/cars.csv")
15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection
version 0.13.1 using Spark classes.
carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model:
string, comment: string, blank: string]

scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType,
true),StructField("Car_Model", StringType  , true)))
carUsersSchema: org.apache.spark.sql.types.StructType =
StructType(StructField(Name,StringType,true),
StructField(Car_Model,StringType,true))

scala> val carUsersdf =
hiveContext.read.format("com.databricks.spark.csv").option("header",
"false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv")
carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model:
string]

scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") ===
carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model"))
joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string,
Car_Model: string]

scala> joineddf.collect.foreach(println)

..

15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at
:39) finished in 2.261 s
15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool
15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at
:39, took 5.323441 s
[Name3,Chevy,Volt]
[Name6,Chevy,Volt]
[Name1,Tesla,S]
[Name4,Tesla,S]
[Name2,Ford,E350]
[Name5,Ford,E350]

scala>


scala> joineddf.write.format("com.databricks.spark.csv").option("header",
"true").save("/tmp/TestDivya/CarUserData.csv")
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
..
..
15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have
all completed, from pool
15/12/28 00:25:40 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
package.scala:157, took 9.293578 s

P.S. : Attaching the output file

On 28 December 2015 at 12:52, Ted Yu  wrote:

> Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared
> in the output of joineddf.collect.foreach(println)
>  ?
>
> Thanks
>
> On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>> I am trying to join two dataframes and able to display the results in the
>> console ater join. I am saving that data and and saving in the joined data
>> in CSV format using spark-csv api . Its just saving the column names not
>> data at all.
>>
>> Below is the sample code for the reference:
>>
>> spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
>>> yarn-client --driver-memory 512m --executor-memory 512m
>>>
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.hive.orc._
>>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType,FloatType ,LongType ,TimestampType };
>>>
>>> val firstSchema = 

Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Jeff Zhang
See
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation



On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮  wrote:

> Hi all,
>
>
>
> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful
> feature to save resources on yarn.
>
> We want to open this feature on our yarn cluster.
>
> I have a question about the version of shuffle service.
>
>
>
> I’m now using spark-1.5.1 (shuffle service).
>
> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
> jar and restart all the namenode on yarn ?
>
>
>
> Thanks a lot.
>
>
>
> Mars
>
>
>



-- 
Best Regards

Jeff Zhang


DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
Hi,
I am trying to join two dataframes and able to display the results in the
console ater join. I am saving that data and and saving in the joined data
in CSV format using spark-csv api . Its just saving the column names not
data at all.

Below is the sample code for the reference:

spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
> yarn-client --driver-memory 512m --executor-memory 512m
>
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.orc._
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType,FloatType ,LongType ,TimestampType };
>
> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType,
> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2",
> StringType, true),StructField("COLUMN3", StringType, true)
> StructField("COLUMN4", StringType, true),StructField("COLUMN5",
> StringType, true)))
> val file1df =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(firstSchema).load("/tmp/File1.csv")
>
>
> val secondSchema = StructType(Seq(
> StructField("COLUMN1", StringType, true),
> StructField("COLUMN2", NullType  , true),
> StructField("COLUMN3", TimestampType , true),
> StructField("COLUMN4", TimestampType , true),
> StructField("COLUMN5", NullType , true),
> StructField("COLUMN6", StringType, true),
> StructField("COLUMN7", IntegerType, true),
> StructField("COLUMN8", IntegerType, true),
> StructField("COLUMN9", StringType, true),
> StructField("COLUMN10", IntegerType, true),
> StructField("COLUMN11", IntegerType, true),
> StructField("COLUMN12", IntegerType, true)))
>
>
> val file2df =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "false").schema(secondSchema).load("/tmp/file2.csv")
> val joineddf = file1df.join(file2df, file1df("COLUMN1") ===
> file2df("COLUMN6"))
> val selecteddata = joineddf.select(file1df("COLUMN2"),file2df("COLUMN10"))
>
//the below statement is printing the joined data

> joineddf.collect.foreach(println)
>


> //this statement saves the CSVfile but only columns names mentioned above
> on the select are being saved
> selecteddata.write.format("com.databricks.spark.csv").option("header",
> "true").save("/tmp/JoinedData.csv")
>


Would really appreciate the pointers /help.

Thanks,
Divya


Re: Stuck with DataFrame df.select("select * from table");

2015-12-27 Thread Gourav Sengupta
Should not df.select just have the column names?
And sqlC.sql have the select statement?

Therefore perhaps we could use: df.select("COLUMN1, COLUMN2") and
sqlC.sql("select COLUMN1, COLUMN2 from tablename")

Why would someone want to do a select on a dataframe after registering it
as a table? I think we should be using hivecontext or sqlcontext to run
queries on a registered table.


Regards,
Gourav Sengupta


On Sat, Dec 26, 2015 at 6:27 PM, Eugene Morozov 
wrote:

> Chris, thanks. That'd be great to try =)
>
> --
> Be well!
> Jean Morozov
>
> On Fri, Dec 25, 2015 at 10:50 PM, Chris Fregly  wrote:
>
>> oh, and it's worth noting that - starting with Spark 1.6 - you'll be able
>> to just do the following:
>>
>> SELECT * FROM json.`/path/to/json/file`
>>
>> (note the back ticks)
>>
>> instead of calling registerTempTable() for the sole purpose of using SQL.
>>
>> https://issues.apache.org/jira/browse/SPARK-11197
>>
>> On Fri, Dec 25, 2015 at 2:17 PM, Chris Fregly  wrote:
>>
>>> I assume by "The same code perfectly works through Zeppelin 0.5.5" that
>>> you're using the %sql interpreter with your regular SQL SELECT statement,
>>> correct?
>>>
>>> If so, the Zeppelin interpreter is converting the  that
>>> follows
>>>
>>> %sql
>>>
>>> to
>>>
>>> sqlContext.sql()
>>>
>>> per the following code:
>>>
>>>
>>> https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L125
>>>
>>>
>>> https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L140-L141
>>>
>>>
>>> Also, keep in mind that you can do something like this if you want to
>>> stay in DataFrame land:
>>>
>>> df.selectExpr("*").limit(5).show()
>>>
>>>
>>>
>>> On Fri, Dec 25, 2015 at 12:53 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 Ted, Igor,

 Oh my... thanks a lot to both of you!
 Igor was absolutely right, but I missed that I have to use sqlContext =(

 Everything's perfect.
 Thank you.

 --
 Be well!
 Jean Morozov

 On Fri, Dec 25, 2015 at 8:31 PM, Ted Yu  wrote:

> DataFrame uses different syntax from SQL query.
> I searched unit tests but didn't find any in the form of df.select("select
> ...")
>
> Looks like you should use sqlContext as other people suggested.
>
> On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Thanks for the comments, although the issue is not in limit()
>> predicate.
>> It's something with spark being unable to resolve the expression.
>>
>> I can do smth like this. It works as it suppose to:
>>  df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5);
>>
>> But I think old fashioned sql style have to work also. I have
>> df.registeredTempTable("tmptable") and then
>>
>> df.select("select * from tmptable where x1 = '3.0'").show();
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from
>> tmp where x1 = '1.0'' given input columns x1, x4, x5, x3, x2;
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca
>>
>>
>> From the first statement I conclude that my custom datasource is
>> perfectly fine.
>> Just wonder how to fix / workaround that.
>> --
>> Be well!
>> Jean Morozov
>>
>> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman 
>> wrote:
>>
>>> sqlContext.sql("select * from table limit 5").show() (not sure if
>>> limit 5 supported)
>>>
>>> or use Dmitriy's solution. select() defines your projection when
>>> you've specified entire query
>>>
>>> On 25 December 2015 at 15:42, Василец Дмитрий <
>>> pronix.serv...@gmail.com> wrote:
>>>
 hello
 you can try to use df.limit(5).show()
 just trick :)

 On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov <
 evgeny.a.moro...@gmail.com> wrote:

> Hello, I'm basically stuck as I have no idea where to look;
>
> Following simple code, given that my Datasource is working gives
> me an exception.
>
> DataFrame df = sqlc.load(filename, 
> "com.epam.parso.spark.ds.DefaultSource");
> df.cache();
> df.printSchema();   <-- prints the schema perfectly fine!
>
> df.show();  

Help: Driver OOM when shuffle large amount of data

2015-12-27 Thread kendal
My driver is running OOM with my 4T data set... I don't collect any data to
driver. All what the program done is map - reduce - saveAsTextFile. But the
partitions to be shuffled is quite large - 20K+.

The symptom what I'm seeing the timeout when GetMapOutputStatuses from
Driver.
15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Don't have map outputs
for shuffle 0, fetching them
15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint =
AkkaRpcEndpointRef(Actor[akka.tcp://sparkDriver@10.115.58.55:52077/user/MapOutputTracker#-1937024516])
15/12/24 02:06:21 WARN akka.AkkaRpcEndpointRef: Error sending message
[message = GetMapOutputStatuses(0)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)

But the root cause is OOM:
15/12/24 02:05:36 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.remote.default-remote-dispatcher-24] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:131)
at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at
akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)

I've already allocated 16G memory for my driver - which is the hard limit
MAX of my Yarn cluster. And I also applied Kryo serialization... Any idea to
reduce memory foot point? 
And what confuses me is that, even I have 20K+ partition to shuffle, why I
need so much memory?!

Thank you so much for any help!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-Driver-OOM-when-shuffle-large-amount-of-data-tp25818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why one of Stage is into Skipped section instead of Completed

2015-12-27 Thread Prem Spark
Thank you Silvio for the update.

On Sat, Dec 26, 2015 at 1:14 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Skipped stages result from existing shuffle output of a stage when
> re-running a transformation. The executors will have the output of the
> stage in their local dirs and Spark recognizes that, so rather than
> re-computing, it will start from the following stage. So, this is a good
> thing in that you’re not re-computing a stage. In your case, it looks like
> there’s already the output of the userreqs RDD (reduceByKey) so it doesn’t
> re-compute it.
>
> From: Prem Spark 
> Date: Friday, December 25, 2015 at 11:41 PM
> To: "user@spark.apache.org" 
> Subject: why one of Stage is into Skipped section instead of Completed
>
>
> Whats does the below Skipped Stage means. can anyone help in clarifying?
> I was expecting 3 stages to get Succeeded but only 2 of them getting
> completed while one is skipped.
> Status: SUCCEEDED
> Completed Stages: 2
> Skipped Stages: 1
>
> Scala REPL Code Used:
>
> accounts is a basic RDD contains weblog text data.
>
> var accountsByID = accounts.
>
> map(line => line.split(',')).
>
> map(values => (values(0),values(4)+','+values(3)));
>
> var userreqs = sc.
>
> textFile("/loudacre/weblogs/*6").
>
> map(line => line.split(' ')).
>
> map(words => (words(2),1)).
>
> reduceByKey((v1,v2) => v1 + v2);
>
> var accounthits =
>
> accountsByID.join(userreqs).map(pair => pair._2)
>
> accounthits.
>
> saveAsTextFile("/loudacre/userreqs")
>
> scala> accounthits.toDebugString
> res15: String =
> (32) MapPartitionsRDD[24] at map at :28 []
>  |   MapPartitionsRDD[23] at join at :28 []
>  |   MapPartitionsRDD[22] at join at :28 []
>  |   CoGroupedRDD[21] at join at :28 []
>  +-(15) MapPartitionsRDD[15] at map at :25 []
>  |  |   MapPartitionsRDD[14] at map at :24 []
>  |  |   /loudacre/accounts/* MapPartitionsRDD[13] at textFile at
> :21 []
>  |  |   /loudacre/accounts/* HadoopRDD[12] at textFile at :21 []
>  |   ShuffledRDD[20] at reduceByKey at :25 []
>  +-(32) MapPartitionsRDD[19] at map at :24 []
> |   MapPartitionsRDD[18] at map at :23 []
> |   /loudacre/weblogs/*6 MapPartitionsRDD[17] at textFile at
> :22 []
> |   /loudacre/weblogs/*6 HadoopRDD[16] at textFile at 
>
>
>
>
>
>
>


DataFrame Vs RDDs ... Which one to use When ?

2015-12-27 Thread Divya Gehlot
Hi,
I am new bee to spark and a bit confused about RDDs and DataFames in Spark.
Can somebody explain me with the use cases which one to use when ?

Would really appreciate the clarification .

Thanks,
Divya


Re: Passing parameters to spark SQL

2015-12-27 Thread Michael Armbrust
The only way to do this for SQL is though the JDBC driver.

However, you can use literal values without lossy/unsafe string conversions
by using the DataFrame API.  For example, to filter:

import org.apache.spark.sql.functions._
df.filter($"columnName" === lit(value))

On Sun, Dec 27, 2015 at 1:11 PM, Ajaxx  wrote:

> Given a SQLContext (or HiveContext) is it possible to pass in parameters
> to a
> query.  There are several reasons why this makes sense, including loss of
> data type during conversion to string, SQL injection, etc.
>
> But currently, it appears that SQLContext.sql() only takes a single
> parameter which is a string.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Inconsistent behavior of randomSplit in YARN mode

2015-12-27 Thread Gaurav Kumar
Hi,

I noticed an inconsistent behavior when using rdd.randomSplit when the
source rdd is repartitioned, but only in YARN mode. It works fine in local
mode though.

*Code:*
val rdd = sc.parallelize(1 to 100)
val rdd2 = rdd.repartition(64)
rdd.partitions.size
rdd2.partitions.size
val Array(train, test) = *rdd2*.randomSplit(Array(70, 30), 1)
train.takeOrdered(10)
test.takeOrdered(10)

*Master: local*
Both the take statements produce consistent results and have no overlap in
numbers being outputted.


*Master: YARN*However, when these are run on YARN mode, these produce
random results every time and also the train and test have overlap in the
numbers being outputted.
If I use *rdd*.randomSplit, then it works fine even on YARN.

So, it concludes that the repartition is being evaluated every time the
splitting occurs.

Interestingly, if I cache the rdd2 before splitting it, then we can expect
consistent behavior since repartition is not evaluated again and again.

Best Regards,
Gaurav Kumar
Big Data • Data Science • Photography • Music
+91 9953294125


Can anyone explain Spark behavior for below? Kudos in Advance

2015-12-27 Thread Prem Spark
Scenario1:
val z = sc.parallelize(List("12","23","345",""),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
+ y)
res143: String = 10

Scenario2:
val z = sc.parallelize(List("12","23","","345"),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
+ y)
res144: String = 11

why the result is different . I was expecting 10 for both. also for the
first Partition


Re: Can anyone explain Spark behavior for below? Kudos in Advance

2015-12-27 Thread Jeff Zhang
Not sure what you try to do, but the result is correct.

Scenario 2:

Partition 1 ("12", "23")
("","12") => "0"
("0","23") => "1"

Partition 2 ("","345")
("","") => "0"
("0","345") => "1"

Final merge:
("1","1") => "11"




On Mon, Dec 28, 2015 at 7:14 AM, Prem Spark  wrote:

> Scenario1:
> val z = sc.parallelize(List("12","23","345",""),2)
> z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
> + y)
> res143: String = 10
>
> Scenario2:
> val z = sc.parallelize(List("12","23","","345"),2)
> z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
> + y)
> res144: String = 11
>
> why the result is different . I was expecting 10 for both. also for the
> first Partition
>



-- 
Best Regards

Jeff Zhang


Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Ted Yu
Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared in
the output of joineddf.collect.foreach(println)
 ?

Thanks

On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot 
wrote:

> Hi,
> I am trying to join two dataframes and able to display the results in the
> console ater join. I am saving that data and and saving in the joined data
> in CSV format using spark-csv api . Its just saving the column names not
> data at all.
>
> Below is the sample code for the reference:
>
> spark-shell   --packages com.databricks:spark-csv_2.10:1.1.0  --master
>> yarn-client --driver-memory 512m --executor-memory 512m
>>
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.sql.hive.orc._
>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType,FloatType ,LongType ,TimestampType };
>>
>> val firstSchema = StructType(Seq(StructField("COLUMN1", StringType,
>> true),StructField("COLUMN2", StringType, true),StructField("COLUMN2",
>> StringType, true),StructField("COLUMN3", StringType, true)
>> StructField("COLUMN4", StringType, true),StructField("COLUMN5",
>> StringType, true)))
>> val file1df =
>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(firstSchema).load("/tmp/File1.csv")
>>
>>
>> val secondSchema = StructType(Seq(
>> StructField("COLUMN1", StringType, true),
>> StructField("COLUMN2", NullType  , true),
>> StructField("COLUMN3", TimestampType , true),
>> StructField("COLUMN4", TimestampType , true),
>> StructField("COLUMN5", NullType , true),
>> StructField("COLUMN6", StringType, true),
>> StructField("COLUMN7", IntegerType, true),
>> StructField("COLUMN8", IntegerType, true),
>> StructField("COLUMN9", StringType, true),
>> StructField("COLUMN10", IntegerType, true),
>> StructField("COLUMN11", IntegerType, true),
>> StructField("COLUMN12", IntegerType, true)))
>>
>>
>> val file2df =
>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>> "false").schema(secondSchema).load("/tmp/file2.csv")
>> val joineddf = file1df.join(file2df, file1df("COLUMN1") ===
>> file2df("COLUMN6"))
>> val selecteddata = joineddf.select(file1df("COLUMN2"),file2df("COLUMN10"))
>>
> //the below statement is printing the joined data
>
>> joineddf.collect.foreach(println)
>>
>
>
>> //this statement saves the CSVfile but only columns names mentioned above
>> on the select are being saved
>> selecteddata.write.format("com.databricks.spark.csv").option("header",
>> "true").save("/tmp/JoinedData.csv")
>>
>
>
> Would really appreciate the pointers /help.
>
> Thanks,
> Divya
>
>
>
>
>


Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread 顾亮亮
Hi all,

SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful 
feature to save resources on yarn.
We want to open this feature on our yarn cluster.
I have a question about the version of shuffle service.

I’m now using spark-1.5.1 (shuffle service).
If I want to upgrade to spark-1.6.0, should I replace the shuffle service jar 
and restart all the namenode on yarn ?

Thanks a lot.

Mars



Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Saisai Shao
External shuffle service is backward compatible, so if you deployed 1.6
shuffle service on NM, it could serve both 1.5 and 1.6 Spark applications.

Thanks
Saisai

On Mon, Dec 28, 2015 at 2:33 PM, 顾亮亮  wrote:

> Is it possible to support both spark-1.5.1 and spark-1.6.0 on one yarn
> cluster?
>
>
>
> *From:* Saisai Shao [mailto:sai.sai.s...@gmail.com]
> *Sent:* Monday, December 28, 2015 2:29 PM
> *To:* Jeff Zhang
> *Cc:* 顾亮亮; user@spark.apache.org; 刘骋昺
> *Subject:* Re: Opening Dynamic Scaling Executors on Yarn
>
>
>
> Replace all the shuffle jars and restart the NodeManager is enough, no
> need to restart NN.
>
>
>
> On Mon, Dec 28, 2015 at 2:05 PM, Jeff Zhang  wrote:
>
> See
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
>
>
>
>
> On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮  wrote:
>
> Hi all,
>
>
>
> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful
> feature to save resources on yarn.
>
> We want to open this feature on our yarn cluster.
>
> I have a question about the version of shuffle service.
>
>
>
> I’m now using spark-1.5.1 (shuffle service).
>
> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
> jar and restart all the namenode on yarn ?
>
>
>
> Thanks a lot.
>
>
>
> Mars
>
>
>
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>
>


RE: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread 顾亮亮
Is it possible to support both spark-1.5.1 and spark-1.6.0 on one yarn cluster?

From: Saisai Shao [mailto:sai.sai.s...@gmail.com]
Sent: Monday, December 28, 2015 2:29 PM
To: Jeff Zhang
Cc: 顾亮亮; user@spark.apache.org; 刘骋昺
Subject: Re: Opening Dynamic Scaling Executors on Yarn

Replace all the shuffle jars and restart the NodeManager is enough, no need to 
restart NN.

On Mon, Dec 28, 2015 at 2:05 PM, Jeff Zhang 
> wrote:
See 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation



On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮 
> wrote:
Hi all,

SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful 
feature to save resources on yarn.
We want to open this feature on our yarn cluster.
I have a question about the version of shuffle service.

I’m now using spark-1.5.1 (shuffle service).
If I want to upgrade to spark-1.6.0, should I replace the shuffle service jar 
and restart all the namenode on yarn ?

Thanks a lot.

Mars




--
Best Regards

Jeff Zhang



Re: Pattern type is incompatible with expected type

2015-12-27 Thread pkhamutou
Thank you for you response!
But this approach did not help.

This one works:
def check[T: ClassTag](rdd: List[T]) = rdd match {
  case rdd: List[Int] if classTag[T] == classTag[Int]  => println("Int")
  case rdd: List[String] if classTag[T] == classTag[String]  =>
println("String")
  case rdd: List[ChildTypeOne] if classTag[T] == classTag[ChildTypeOne] 
=> println("ChildTypeOne")
}
 }


This one doesn't (Both times I got 1st case):

def check2[T: ClassTag](rdd: RDD[T]) = rdd match {
  case rdd: RDD[ChildTypeOne] if classTag[T] == classTag[ChildTypeOne] 
=>   
println("ChildTypeOne")

  case rdd: RDD[ChildTypeTwo] if classTag[T] == classTag[ChildTypeTwo] 
=> 
println("ChildTypeTwo")
}
   
I found this https://issues.apache.org/jira/browse/SPARK-1296 
Does it mean that there is no way to solve this problem? 


Pavel



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pattern-type-is-incompatible-with-expected-type-tp25805p25819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame Save is writing just column names while saving

2015-12-27 Thread Divya Gehlot
Finally able to resolve the issue
For sample example having small dataset , its creating some 200 files .. I
was just doing the random file check in output directory and Alas ! was
getting all column files
Attaching the output files now ..
Now another question arises why so many (200 output files) are getting
created just for small data set
Attaching the dataset files too.

On 28 December 2015 at 13:29, Divya Gehlot  wrote:

> yes
> Sharing the execution flow
>
> 15/12/28 00:19:15 INFO SessionState: No Tez session required at this
> point. hive.execution.engine=mr.
> 15/12/28 00:19:15 INFO SparkILoop: Created sql context (with Hive
> support)..
> SQL context available as sqlContext.
>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.sql.hive.orc._
> import org.apache.spark.sql.hive.orc._
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 15/12/28 00:20:15 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/12/28 00:20:15 INFO HiveContext: Initializing execution hive, version
> 0.13.1
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@9046f81
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType, IntegerType,FloatType ,LongType ,TimestampType };
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, FloatType, LongType, TimestampType}
>
> scala> val carsSchema = StructType(Seq(StructField("year", IntegerType,
> true),StructField("make", StringType, true),StructField("model",
> StringType, true),StructField("comment", StringType,
> true),StructField("blank", StringType, true)))
> carsSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(year,IntegerType,true),
> StructField(make,StringType,true), StructField(model,StringType,true),
> StructField(comment,StringType,true), StructField(blank,StringType,true))
>
> scala> val carsdf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(carsSchema).load("/tmp/TestDivya/cars.csv")
> 15/12/28 00:20:45 INFO HiveContext: Initializing HiveMetastoreConnection
> version 0.13.1 using Spark classes.
> carsdf: org.apache.spark.sql.DataFrame = [year: int, make: string, model:
> string, comment: string, blank: string]
>
> scala> val carUsersSchema = StructType(Seq(StructField("Name", StringType,
> true),StructField("Car_Model", StringType  , true)))
> carUsersSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(Name,StringType,true),
> StructField(Car_Model,StringType,true))
>
> scala> val carUsersdf =
> hiveContext.read.format("com.databricks.spark.csv").option("header",
> "false").schema(carUsersSchema).load("/tmp/TestDivya/CarUsers.csv")
> carUsersdf: org.apache.spark.sql.DataFrame = [Name: string, Car_Model:
> string]
>
> scala> val joineddf = (carsdf.join(carUsersdf, carsdf("model") ===
> carUsersdf("Car_Model"))).select(carUsersdf("Name"),carsdf("make"),carUsersdf("Car_Model"))
> joineddf: org.apache.spark.sql.DataFrame = [Name: string, make: string,
> Car_Model: string]
>
> scala> joineddf.collect.foreach(println)
> 
> ..
>
> 15/12/28 00:21:35 INFO DAGScheduler: ResultStage 3 (collect at
> :39) finished in 2.261 s
> 15/12/28 00:21:35 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks
> have all completed, from pool
> 15/12/28 00:21:35 INFO DAGScheduler: Job 1 finished: collect at
> :39, took 5.323441 s
> [Name3,Chevy,Volt]
> [Name6,Chevy,Volt]
> [Name1,Tesla,S]
> [Name4,Tesla,S]
> [Name2,Ford,E350]
> [Name5,Ford,E350]
>
> scala>
>
>
> scala> joineddf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/tmp/TestDivya/CarUserData.csv")
> 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
> 15/12/28 00:25:31 INFO Exchange: Using SparkSqlSerializer2.
> ..
> ..
> 15/12/28 00:25:40 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks
> have all completed, from pool
> 15/12/28 00:25:40 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
> package.scala:157, took 9.293578 s
>
> P.S. : Attaching the output file
>
> On 28 December 2015 at 12:52, Ted Yu  wrote:
>
>> Can you confirm that file1df("COLUMN2") and file2df("COLUMN10") appeared
>> in the output of joineddf.collect.foreach(println)
>>  ?
>>
>> Thanks
>>
>> On Sun, Dec 27, 2015 at 6:32 PM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> I am trying to join two dataframes and able to display the results in
>>> the console ater join. I am saving that data 

Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Saisai Shao
Replace all the shuffle jars and restart the NodeManager is enough, no need
to restart NN.

On Mon, Dec 28, 2015 at 2:05 PM, Jeff Zhang  wrote:

> See
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
> On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮  wrote:
>
>> Hi all,
>>
>>
>>
>> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a
>> useful feature to save resources on yarn.
>>
>> We want to open this feature on our yarn cluster.
>>
>> I have a question about the version of shuffle service.
>>
>>
>>
>> I’m now using spark-1.5.1 (shuffle service).
>>
>> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
>> jar and restart all the namenode on yarn ?
>>
>>
>>
>> Thanks a lot.
>>
>>
>>
>> Mars
>>
>>
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


partitioning json data in spark

2015-12-27 Thread Նարեկ Գալստեան
Hey all!
I am willing to partition *json *data by a column name and store the result
as a collection of json files to be loaded to another database.

I could use spark's built in *partitonBy *function but it only outputs in
parquet format which is not desirable for me.

Could you suggest me a way to deal with this problem?
Narek Galstyan

Նարեկ Գալստյան