Re: How this unit test passed on master trunk?

2016-04-23 Thread Zhan Zhang
There are multiple records for the DF

scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).show
+---+-+
|  a|min(struct(unresolvedstar()))|
+---+-+
|  1|[1,1]|
|  3|[3,1]|
|  2|[2,1]|

The meaning of .groupBy($"a").agg(min(struct($"record.*"))) is to get the min 
for all the records with the same $”a”

For example: TestData2(1,1) :: TestData2(1,2) The result would be 1, (1, 1), 
since struct(1, 1) is less than struct(1, 2). Please check how the Ordering is 
implemented in InterpretedOrdering.

The output itself does not have any ordering. I am not sure why the unit test 
and the real env have different environment.

Xiao,

I do see the difference between unit test and local cluster run. Do you know 
the reason?

Thanks.

Zhan Zhang




On Apr 22, 2016, at 11:23 AM, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:

Hi,

I was trying to find out why this unit test can pass in Spark code.

in
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

for this unit test:

  test("Star Expansion - CreateStruct and CreateArray") {
val structDf = testData2.select("a", "b").as("record")
// CreateStruct and CreateArray in aggregateExpressions
assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == 
Row(3, Row(3, 1)))
assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == 
Row(3, Seq(3, 1)))

// CreateStruct and CreateArray in project list (unresolved alias)
assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1)))
assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === 
Seq(1, 1))

// CreateStruct and CreateArray in project list (alias)
assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 
1)))

assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) 
=== Seq(1, 1))
  }

>From my understanding, the data return in this case should be Row(1, Row(1, 
>1]), as that will be min of struct.

In fact, if I run the spark-shell on my laptop, and I got the result I expected:


./bin/spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class TestData2(a: Int, b: Int)
defined class TestData2

scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) :: 
TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: 
TestData2(3,2) :: Nil, 2).toDF()

scala> val structDF = testData2DF.select("a","b").as("record")

scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first()
res0: org.apache.spark.sql.Row = [1,[1,1]]

scala> structDF.show
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
|  3|  1|
|  3|  2|
+---+---+

So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back 
in this case. Why the unit test asserts that Row[3,[1,1]] should be the first, 
and it will pass? But I cannot reproduce that in my spark-shell? I am trying to 
understand how to interpret the meaning of "agg(min(struct($"record.*")))"


Thanks

Yong



Re: Save DataFrame to HBase

2016-04-22 Thread Zhan Zhang
You can try this

https://github.com/hortonworks/shc.git

or here
http://spark-packages.org/package/zhzhan/shc

Currently it is in the process of merging into HBase.

Thanks.

Zhan Zhang

On Apr 21, 2016, at 8:44 AM, Benjamin Kim 
<bbuil...@gmail.com<mailto:bbuil...@gmail.com>> wrote:

Hi Ted,

Can this module be used with an older version of HBase, such as 1.0 or 1.1? 
Where can I get the module from?

Thanks,
Ben

On Apr 21, 2016, at 6:56 AM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
this.

On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim 
<bbuil...@gmail.com<mailto:bbuil...@gmail.com>> wrote:
Has anyone found an easy way to save a DataFrame into HBase?

Thanks,
Ben


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>






Re: Spark SQL insert overwrite table not showing all the partition.

2016-04-22 Thread Zhan Zhang
INSERT OVERWRITE will overwrite any existing data in the table or partition

  *   unless IF NOT EXISTS is provided for a partition (as of Hive 
0.9.0<https://issues.apache.org/jira/browse/HIVE-2612>).


Thanks.

Zhan Zhang

On Apr 21, 2016, at 3:20 PM, Bijay Kumar Pathak 
<bkpat...@mtu.edu<mailto:bkpat...@mtu.edu>> wrote:

Hi,

I have a job which writes to the Hive table with dynamic partition. Inside the 
job,  I am writing into the table two-time but I am only seeing the partition 
with last write although I can see in the Spark UI it is processing data fro 
both the partition.

Below is the query I am using to write to the table.

hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2)
  SELECT * from temp_table
""".format(date_val)
 )


Thanks,
Bijay



Re: Spark DataFrame sum of multiple columns

2016-04-22 Thread Zhan Zhang
You can define your own udf, following is one example

Thanks

Zhan Zhang


val foo = udf((a: Int, b: String) => a.toString + b)

checkAnswer(
  // SELECT *, foo(key, value) FROM testData
  testData.select($"*", foo('key, 'value)).limit(3),



On Apr 21, 2016, at 8:51 PM, Naveen Kumar Pokala 
<npok...@spcapitaliq.com<mailto:npok...@spcapitaliq.com>> wrote:

Hi,

Do we have any way to perform Row level operations in spark dataframes.


For example,

I have a dataframe with columns from A,B,C,…Z.. I want to add one more column 
New Column with sum of all column values.

A

B

C

D

.

.

.

Z

New Column

1

2

4

3







26

351



Can somebody help me on this?


Thanks,
Naveen



Re: Why Spark having OutOfMemory Exception?

2016-04-21 Thread Zhan Zhang
The data may be not large, but the driver need to do a lot of bookkeeping. In 
your case,  it is possible the driver control plane takes too much memory.

I think you can find a java developer to look at the coredump. Otherwise, it is 
hard to tell exactly which part are using all the memory.

Thanks.

Zhan Zhang


On Apr 20, 2016, at 1:38 AM, 李明伟 
<kramer2...@126.com<mailto:kramer2...@126.com>> wrote:

Hi

the input data size is less than 10M. The task result size should be less I 
think. Because I am doing aggregation on the data





At 2016-04-20 16:18:31, "Jeff Zhang" 
<zjf...@gmail.com<mailto:zjf...@gmail.com>> wrote:
Do you mean the input data size as 10M or the task result size ?

>>> But my way is to setup a forever loop to handle continued income data. Not 
>>> sure if it is the right way to use spark
Not sure what this mean, do you use spark-streaming, for doing batch job in the 
forever loop ?



On Wed, Apr 20, 2016 at 3:55 PM, 李明伟 
<kramer2...@126.com<mailto:kramer2...@126.com>> wrote:
Hi Jeff

The total size of my data is less than 10M. I already set the driver memory to 
4GB.







在 2016-04-20 13:42:25,"Jeff Zhang" <zjf...@gmail.com<mailto:zjf...@gmail.com>> 
写道:
Seems it is OOM in driver side when fetching task result.

You can try to increase spark.driver.memory and spark.driver.maxResultSize

On Tue, Apr 19, 2016 at 4:06 PM, 李明伟 
<kramer2...@126.com<mailto:kramer2...@126.com>> wrote:
Hi Zhan Zhang


Please see the exception trace below. It is saying some GC overhead limit error
I am not a java or scala developer so it is hard for me to understand these 
infor.
Also reading coredump is too difficult to me..

I am not sure if the way I am using spark is correct. I understand that spark 
can do batch or stream calculation. But my way is to setup a forever loop to 
handle continued income data.
Not sure if it is the right way to use spark


16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at 
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC 
overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethod

Re: Read Parquet in Java Spark

2016-04-18 Thread Zhan Zhang
You can try something like below, if you only have one column.

val rdd = parquetFile.javaRDD().map(row => row.getAs[String](0)

Thanks.

Zhan Zhang

On Apr 18, 2016, at 3:44 AM, Ramkumar V 
<ramkumar.c...@gmail.com<mailto:ramkumar.c...@gmail.com>> wrote:

HI,

Any idea on this ?

Thanks,
[http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png]
[http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Apr 4, 2016 at 2:47 PM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
I wasn't knowing you have a parquet file containing json data.

Thanks
Best Regards

On Mon, Apr 4, 2016 at 2:44 PM, Ramkumar V 
<ramkumar.c...@gmail.com<mailto:ramkumar.c...@gmail.com>> wrote:
Hi Akhil,

Thanks for your help. Why do you put separator as "," ?

I have a parquet file which contains only json in each line.

Thanks,
[http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png]
[http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
Something like this (in scala):

val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))

You can create a map operation over your javaRDD to convert the 
org.apache.spark.sql.Row<https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/Row.html>
 to String (the Row.mkstring() Operation)

Thanks
Best Regards

On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V 
<ramkumar.c...@gmail.com<mailto:ramkumar.c...@gmail.com>> wrote:
Any idea on this ? How to convert parquet file into JavaRDD ?

Thanks,
[http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png]
[http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V 
<ramkumar.c...@gmail.com<mailto:ramkumar.c...@gmail.com>> wrote:
Hi,

Thanks for the reply.  I tried this. It's returning JavaRDD instead of 
JavaRDD. How to get JavaRDD ?

Error :
incompatible types: org.apache.spark.api.java.JavaRDD 
cannot be converted to org.apache.spark.api.java.JavaRDD





Thanks,
[http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png]
[http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY 
<umesh9...@gmail.com<mailto:umesh9...@gmail.com>> wrote:
>From Spark Documentation:


DataFrame parquetFile = sqlContext.read().parquet("people.parquet");


JavaRDD jRDD= parquetFile.javaRDD()

javaRDD() method will convert the DF to RDD

On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V 
<ramkumar.c...@gmail.com<mailto:ramkumar.c...@gmail.com>> wrote:
Hi,

I'm trying to read parquet log files in Java Spark. Parquet log files are 
stored in hdfs. I want to read and convert that parquet file into JavaRDD. I 
could able to find Sqlcontext dataframe api. How can I read if it is 
sparkcontext and rdd ? what is the best way to read it ?

Thanks,
[http://www.mylivesignature.com/signatures/54491/300/42C82353F8F99C0C0B59C2E122C12687.png]
[http://thelinkedinman.com/wp-content/uploads/sites/2/2012/01/linkedinbutton.jpg]<https://in.linkedin.com/in/ramkumarcs31>











Re: Why Spark having OutOfMemory Exception?

2016-04-18 Thread Zhan Zhang
What kind of OOM? Driver or executor side? You can use coredump to find what 
cause the OOM.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 9:44 PM, 李明伟 
<kramer2...@126.com<mailto:kramer2...@126.com>> wrote:

Hi Samaga

Thanks very much for your reply and sorry for the delay reply.

Cassandra or Hive is a good suggestion.
However in my situation I am not sure if it will make sense.

My requirements is that to get the recent 24 hour data to generate report. The 
frequency is 5 minute.
So if use cassandra or hive, it means spark will have to read 24 hour data 
every 5 mintues. And among those data, a big part (like 23 hours or more ) will 
be repeatedly read.

The window in spark is for stream computing. I did not use it but I will 
consider it


Thanks again

Regards
Mingwei





At 2016-04-11 19:09:48, "Lohith Samaga M" 
<lohith.sam...@mphasis.com<mailto:lohith.sam...@mphasis.com>> wrote:
>Hi Kramer,
>   Some options:
>   1. Store in Cassandra with TTL = 24 hours. When you read the full 
> table, you get the latest 24 hours data.
>   2. Store in Hive as ORC file and use timestamp field to filter out the 
> old data.
>   3. Try windowing in spark or flink (have not used either).
>
>
>Best regards / Mit freundlichen Grüßen / Sincères salutations
>M. Lohith Samaga
>
>
>-Original Message-
>From: kramer2...@126.com<mailto:kramer2...@126.com> [mailto:kramer2...@126.com]
>Sent: Monday, April 11, 2016 16.18
>To: user@spark.apache.org<mailto:user@spark.apache.org>
>Subject: Why Spark having OutOfMemory Exception?
>
>I use spark to do some very simple calculation. The description is like below 
>(pseudo code):
>
>
>While timestamp == 5 minutes
>
>df = read_hdf() # Read hdfs to get a dataframe every 5 minutes
>
>my_dict[timestamp] = df # Put the data frame into a dict
>
>delete_old_dataframe( my_dict ) # Delete old dataframe (timestamp is one
>24 hour before)
>
>big_df = merge(my_dict) # Merge the recent 24 hours data frame
>
>To explain..
>
>I have new files comes in every 5 minutes. But I need to generate report on 
>recent 24 hours data.
>The concept of 24 hours means I need to delete the oldest data frame every 
>time I put a new one into it.
>So I maintain a dict (my_dict in above code), the dict contains map like
>timestamp: dataframe. Everytime I put dataframe into the dict, I will go 
>through the dict to delete those old data frame whose timestamp is 24 hour ago.
>After delete and input. I merge the data frames in the dict to a big one and 
>run SQL on it to get my report.
>
>*
>I want to know if any thing wrong about this model? Because it is very slow 
>after started for a while and hit OutOfMemory. I know that my memory is 
>enough. Also size of file is very small for test purpose. So should not have 
>memory problem.
>
>I am wondering if there is lineage issue, but I am not sure.
>
>*
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-having-OutOfMemory-Exception-tp26743.html
>Sent from the Apache Spark User List mailing list archive at 
>Nabble.com<http://Nabble.com>.
>
>-
>To unsubscribe, e-mail: 
>user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> 
>For additional commands, e-mail: 
>user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
>
>Information transmitted by this e-mail is proprietary to Mphasis, its 
>associated companies and/ or its customers and is intended
>for use only by the individual or entity to which it is addressed, and may 
>contain information that is privileged, confidential or
>exempt from disclosure under applicable law. If you are not the intended 
>recipient or it appears that this mail has been forwarded
>to you without proper authority, you are notified that any use or 
>dissemination of this information in any manner is strictly
>prohibited. In such cases, please notify us immediately at 
>mailmas...@mphasis.com<mailto:mailmas...@mphasis.com> and delete this mail 
>from your records.
>







Re: Problem using limit clause in spark sql

2015-12-23 Thread Zhan Zhang
There has to have a central point to collaboratively collecting exactly 1 
records, currently the approach is using one single partitions, which is easy 
to implement.
Otherwise, the driver has to count the number of records in each partition and 
then decide how many records  to be materialized in each partition, because 
some partition may not have enough number of records, sometimes it is even 
empty.

I didn’t see any straightforward walk around for this.

Thanks.

Zhan Zhang



On Dec 23, 2015, at 5:32 PM, 汪洋 
<tiandiwo...@icloud.com<mailto:tiandiwo...@icloud.com>> wrote:

It is an application running as an http server. So I collect the data as the 
response.

在 2015年12月24日,上午8:22,Hudong Wang 
<justupl...@hotmail.com<mailto:justupl...@hotmail.com>> 写道:

When you call collect() it will bring all the data to the driver. Do you mean 
to call persist() instead?


From: tiandiwo...@icloud.com<mailto:tiandiwo...@icloud.com>
Subject: Problem using limit clause in spark sql
Date: Wed, 23 Dec 2015 21:26:51 +0800
To: user@spark.apache.org<mailto:user@spark.apache.org>

Hi,
I am using spark sql in a way like this:

sqlContext.sql(“select * from table limit 1”).map(...).collect()

The problem is that the limit clause will collect all the 10,000 records into a 
single partition, resulting the map afterwards running only in one partition 
and being really slow.I tried to use repartition, but it is kind of a waste to 
collect all those records into one partition and then shuffle them around and 
then collect them again.

Is there a way to work around this?
BTW, there is no order by clause and I do not care which 1 records I get as 
long as the total number is less or equal then 1.




Re: Unable to create hive table using HiveContext

2015-12-23 Thread Zhan Zhang
You are using embedded mode, which will create the db locally (in your case, 
maybe the db has been created, but you do not have right permission?).

To connect to remote metastore, hive-site.xml has to be correctly configured.

Thanks.

Zhan Zhang


On Dec 23, 2015, at 7:24 AM, Soni spark 
<soni2015.sp...@gmail.com<mailto:soni2015.sp...@gmail.com>> wrote:

Hi friends,

I am trying to create hive table through spark with Java code in Eclipse using 
below code.

HiveContext sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc.sc<http://sc.sc/>());
   sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");


but i am getting error

RROR XBM0J: Directory /home/workspace4/Test/metastore_db already exists.

I am not sure why metastore creating in workspace. Please help me.

Thanks
Soniya



Re: DataFrameWriter.format(String) is there a list of options?

2015-12-23 Thread Zhan Zhang
Now json, parquet, orc(in hivecontext), text are natively supported. If you use 
avro or others, you have to include the package, which are not built into spark 
jar.

Thanks.

Zhan Zhang

On Dec 23, 2015, at 8:57 AM, Christopher Brady 
<christopher.br...@oracle.com<mailto:christopher.br...@oracle.com>> wrote:

DataFrameWriter.format



Re: Can SqlContext be used inside mapPartitions

2015-12-22 Thread Zhan Zhang
SQLContext is in driver side, and I don’t think you can use it in executors. 
How to provide lookup functionality in executors really depends on how you 
would use them. 

Thanks.

Zhan Zhang

On Dec 22, 2015, at 4:44 PM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
> 
> Can SQL Context be used inside mapPartitions? My requirement is to register
> a set of data from hdfs as a temp table and to be able to lookup from inside
> MapPartitions based on a key. If it is not supported, is there a different
> way of doing this?
> 
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-SqlContext-be-used-inside-mapPartitions-tp25771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: number limit of map for spark

2015-12-21 Thread Zhan Zhang
In what situation, you have such cases? If there is no shuffle, you can 
collapse all these functions into one, right? In the meantime, it is not 
recommended to collect
all data to driver.

Thanks.

Zhan Zhang

On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear All,

I need to iterator some job / rdd quite a lot of times, but just lost in the 
problem of
spark only accept to call around 350 number of map before it meets one action 
Function ,
besides, dozens of action will obviously increase the run time.
Is there any proper way ...

As tested, there is piece of codes as follows:

..
 83 int count = 0;
 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with 
only 1 partition
 85 int m = 350;
 86 JavaRDD r = dataSet.cache();
 87 JavaRDD t = null;
 88
 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd 
r to t
 90   if(null != t) {
 91 r = t;
 92   }
//inner loop to call map 350 times , if m is much more than 350 
(for instance, around 400), then the job will throw exception message
  "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw 
exception: java.lang.StackOverflowError java.lang.StackOverflowError")
 93   for(int i=0; i < m; ++i) {
 94 r = r.map(new Function<Integer, Integer>() {
 95   @Override
 96   public Integer call(Integer integer) {
 97 double x = Math.random() * 2 - 1;
 98 double y = Math.random() * 2 - 1;
 99 return (x * x + y * y < 1) ? 1 : 0;
100   }
101 });

104   }
105
106   List lt = r.collect(); //then collect this rdd to get 
another rdd, however, dozens of action Function as collect is VERY MUCH COST
107   t = jsc.parallelize(lt, 1).cache();
108
109 }
110
..

Thanks very much in advance!
Zhiliang




Re: number limit of map for spark

2015-12-21 Thread Zhan Zhang
What I mean is to combine multiple map functions into one. Don’t know how 
exactly your algorithms works. Did your one iteration result depend on last 
iteration? If so, how do they depend on?
I think either you can optimize your implementation, or Spark is not the right 
one for your specific application.

Thanks.

Zhan Zhang

On Dec 21, 2015, at 10:43 AM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

What is difference between repartition  / collect and   collapse ...
Is collapse the same costly as collect or repartition ?

Thanks in advance ~


On Tuesday, December 22, 2015 2:24 AM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:


In what situation, you have such cases? If there is no shuffle, you can 
collapse all these functions into one, right? In the meantime, it is not 
recommended to collect
all data to driver.

Thanks.

Zhan Zhang

On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear All,

I need to iterator some job / rdd quite a lot of times, but just lost in the 
problem of
spark only accept to call around 350 number of map before it meets one action 
Function ,
besides, dozens of action will obviously increase the run time.
Is there any proper way ...

As tested, there is piece of codes as follows:

..
 83 int count = 0;
 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with 
only 1 partition
 85 int m = 350;
 86 JavaRDD r = dataSet.cache();
 87 JavaRDD t = null;
 88
 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd 
r to t
 90   if(null != t) {
 91 r = t;
 92   }
//inner loop to call map 350 times , if m is much more than 350 
(for instance, around 400), then the job will throw exception message
  "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw 
exception: java.lang.StackOverflowError java.lang.StackOverflowError")
 93   for(int i=0; i < m; ++i) {
 94 r = r.map(new Function<Integer, Integer>() {
 95   @Override
 96   public Integer call(Integer integer) {
 97 double x = Math.random() * 2 - 1;
 98 double y = Math.random() * 2 - 1;
 99 return (x * x + y * y < 1) ? 1 : 0;
100   }
101 });

104   }
105
106   List lt = r.collect(); //then collect this rdd to get 
another rdd, however, dozens of action Function as collect is VERY MUCH COST
107   t = jsc.parallelize(lt, 1).cache();
108
109 }
110
..

Thanks very much in advance!
Zhiliang







Re: Spark with log4j

2015-12-21 Thread Zhan Zhang
Hi Kalpesh,

If you are using spark on yarn, it may not work. Because you write log to files 
other than stdout/stderr, which yarn log aggregation may not work. As I 
understand, yarn only aggregate log in stdout/stderr, and local cache will be 
deleted (in configured timeframe).

To check it, at application run time, you can log into the container’s box, and 
check the local cache of the container to find whether the log file exists or 
not (after app terminate, these local cache files will be deleted as well).

Thanks.

Zhan Zhang

On Dec 18, 2015, at 7:23 AM, Kalpesh Jadhav 
<kalpesh.jad...@citiustech.com<mailto:kalpesh.jad...@citiustech.com>> wrote:

Hi all,

I am new to spark, I am trying to use log4j for logging my application.
But any how the logs are not getting written at specified file.

I have created application using maven, and kept log.properties file at 
resources folder.
Application written in scala .

If there is any alternative instead of log4j then also it will work, but I 
wanted to see logs in file.

If any changes need to be done in 
hortonworks<https://www.google.co.in/search?client=firefox-a=org.mozilla:en-US:official=fflb=hortonworks=1=X=0ahUKEwj5k4Gq2-XJAhXUB44KHYU-C6MQvwUIGSgA>
 for spark configuration, please mentioned that as well.

If anyone has done before or on github any source available please respond.


Thanks,
Kalpesh Jadhav
===
 DISCLAIMER: The information contained in this message (including any 
attachments) is confidential and may be privileged. If you have received it by 
mistake please notify the sender by return e-mail and permanently delete this 
message and any attachments from your system. Any dissemination, use, review, 
distribution, printing or copying of this message in whole or in part is 
strictly prohibited. Please note that e-mails are susceptible to change. 
CitiusTech shall not be liable for the improper or incomplete transmission of 
the information contained in this communication nor for any delay in its 
receipt or damage to your system. CitiusTech does not guarantee that the 
integrity of this communication has been maintained or that this communication 
is free of viruses, interceptions or interferences. 




Re: spark-submit is ignoring "--executor-cores"

2015-12-21 Thread Zhan Zhang
BTW: It is not only a Yarn-webui issue. In capacity scheduler, vcore is 
ignored. If you want Yarn to honor vcore requests, you have to use 
DominantResourceCalculator as Saisai suggested.

Thanks.

Zhan Zhang

On Dec 21, 2015, at 5:30 PM, Saisai Shao 
<sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote:

 and you'll see the right vcores y



Re: About Spark On Hbase

2015-12-15 Thread Zhan Zhang
If you want dataframe support, you can refer to https://github.com/zhzhan/shc, 
which I am working on to integrate to HBase upstream with existing support.

Thanks.

Zhan Zhang
On Dec 15, 2015, at 4:34 AM, censj 
<ce...@lotuseed.com<mailto:ce...@lotuseed.com>> wrote:


hi,fight fate
Did I can in bulkPut() function use Get value first ,then put this value to 
Hbase ?


在 2015年12月9日,16:02,censj <ce...@lotuseed.com<mailto:ce...@lotuseed.com>> 写道:

Thank you! I know
在 2015年12月9日,15:59,fightf...@163.com<mailto:fightf...@163.com> 写道:

If you are using maven , you can add the cloudera maven repo to the repository 
in pom.xml
and add the dependency of spark-hbase.
I just found this : 
http://spark-packages.org/package/nerdammer/spark-hbase-connector
as Feng Dongyu recommend, you can try this also, but I had no experience of 
using this.



fightf...@163.com<mailto:fightf...@163.com>

发件人: censj<mailto:ce...@lotuseed.com>
发送时间: 2015-12-09 15:44
收件人: fightf...@163.com<mailto:fightf...@163.com>
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: About Spark On Hbase
So, I how to get this jar? I use set package project.I not found sbt lib.
在 2015年12月9日,15:42,fightf...@163.com<mailto:fightf...@163.com> 写道:

I don't think it really need CDH component. Just use the API


fightf...@163.com<mailto:fightf...@163.com>

发件人: censj<mailto:ce...@lotuseed.com>
发送时间: 2015-12-09 15:31
收件人: fightf...@163.com<mailto:fightf...@163.com>
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: About Spark On Hbase
But this is dependent on CDH。I not install CDH。
在 2015年12月9日,15:18,fightf...@163.com<mailto:fightf...@163.com> 写道:

Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase
Also, HBASE-13992<https://issues.apache.org/jira/browse/HBASE-13992>  already 
integrates that feature into the hbase side, but
that feature has not been released.

Best,
Sun.


fightf...@163.com<mailto:fightf...@163.com>

From: censj<mailto:ce...@lotuseed.com>
Date: 2015-12-09 15:04
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: About Spark On Hbase
hi all,
 now I using spark,but I not found spark operation hbase open source. Do 
any one tell me?





Re: Spark big rdd problem

2015-12-15 Thread Zhan Zhang
You should be able to get the logs from yarn by “yarn logs -applicationId xxx”, 
where you can possible find the cause.

Thanks.

Zhan Zhang

On Dec 15, 2015, at 11:50 AM, Eran Witkon <eranwit...@gmail.com> wrote:

> When running 
> val data = sc.wholeTextFile("someDir/*") data.count()
> 
> I get numerous warning from yarn till I get aka association exception.
> Can someone explain what happen when spark loads this rdd and can't fit it 
> all in memory?
> Based on the exception it looks like the server is disconnecting from yarn 
> and failing... Any idea why? The code is simple but still failing...
> Eran


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark big rdd problem

2015-12-15 Thread Zhan Zhang
There are two cases here. If the container is killed by yarn, you can increase 
jvm overhead. Otherwise, you have to increase the executor-memory if there is 
no memory leak happening.

Thanks.

Zhan Zhang

On Dec 15, 2015, at 9:58 PM, Eran Witkon 
<eranwit...@gmail.com<mailto:eranwit...@gmail.com>> wrote:

If the problem is containers trying to use more memory then they allowed, how 
do I limit them? I all ready have executor-memory 5G
Eran
On Tue, 15 Dec 2015 at 23:10 Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:
You should be able to get the logs from yarn by “yarn logs -applicationId xxx”, 
where you can possible find the cause.

Thanks.

Zhan Zhang

On Dec 15, 2015, at 11:50 AM, Eran Witkon 
<eranwit...@gmail.com<mailto:eranwit...@gmail.com>> wrote:

> When running
> val data = sc.wholeTextFile("someDir/*") data.count()
>
> I get numerous warning from yarn till I get aka association exception.
> Can someone explain what happen when spark loads this rdd and can't fit it 
> all in memory?
> Based on the exception it looks like the server is disconnecting from yarn 
> and failing... Any idea why? The code is simple but still failing...
> Eran




Re: Multi-core support per task in Spark

2015-12-11 Thread Zhan Zhang
I noticed that it is configurable in job level spark.task.cpus.  Anyway to 
support on task level?

Thanks.

Zhan Zhang


On Dec 11, 2015, at 10:46 AM, Zhan Zhang <zzh...@hortonworks.com> wrote:

> Hi Folks,
> 
> Is it possible to assign multiple core per task and how? Suppose we have some 
> scenario, in which some tasks are really heavy processing each record and 
> require multi-threading, and we want to avoid similar tasks assigned to the 
> same executors/hosts. 
> 
> If it is not supported, does it make sense to add this feature. It may seems 
> make user worry about more configuration, but by default we can still do 1 
> core per task and only advanced users need to be aware of this feature.
> 
> Thanks.
> 
> Zhan Zhang
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Zhan Zhang
I think you are fetching too many results to the driver. Typically, it is not 
recommended to collect much data to driver. But if you have to, you can 
increase the driver memory, when submitting jobs.

Thanks.

Zhan Zhang

On Dec 11, 2015, at 6:14 AM, Tom Seddon 
<mr.tom.sed...@gmail.com<mailto:mr.tom.sed...@gmail.com>> wrote:

I have a job that is running into intermittent errors with  [SparkDriver] 
java.lang.OutOfMemoryError: Java heap space.  Before I was getting this error I 
was getting errors saying the result size exceed the 
spark.driver.maxResultSize.  This does not make any sense to me, as there are 
no actions in my job that send data to the driver - just a pull of data from 
S3, a map and reduceByKey and then conversion to dataframe and saveAsTable 
action that puts the results back on S3.

I've found a few references to reduceByKey and spark.driver.maxResultSize 
having some importance, but cannot fathom how this setting could be related.

Would greatly appreciated any advice.

Thanks in advance,

Tom



Re: Performance does not increase as the number of workers increasing in cluster mode

2015-12-11 Thread Zhan Zhang
Not sure your data and model size. But intuitively, there is a tradeoff between 
parallel and network overhead. With the same data set and model, there is a 
optimum point of cluster size (performance may degrade at some point with the 
cluster size increment).  You may want to test larger data set if you wan tot 
do some performance benchmark.

Thanks.

Zhan Zhang



On Dec 11, 2015, at 9:34 AM, Wei Da <xwd0...@qq.com<mailto:xwd0...@qq.com>> 
wrote:

Hi, all

I have done a test in different HW configurations of Spark 1.5.0. A KMeans 
algorithm has been ran in four different Spark environments, the first one ran 
in local mode, the other three ran in cluster mode, all the nodes are with the 
same CPU (6 cores) and Memory (8G). The running times are recorded in the 
following. I thought the performance should increase as the number of workers 
increasing. But the result shows no obvious improvement. Does anybody know the 
reason? Thanks a lot in advance!

The number of rows in test data is about 2.6 million, the input file is about 
810M and stores in HDFS.
[X]


Following is snapshot of the Spark WebUI.
[X]

Wei Da

Wei Da
xwd0...@qq.com<mailto:xwd0...@qq.com>






Multi-core support per task in Spark

2015-12-11 Thread Zhan Zhang
Hi Folks,

Is it possible to assign multiple core per task and how? Suppose we have some 
scenario, in which some tasks are really heavy processing each record and 
require multi-threading, and we want to avoid similar tasks assigned to the 
same executors/hosts. 

If it is not supported, does it make sense to add this feature. It may seems 
make user worry about more configuration, but by default we can still do 1 core 
per task and only advanced users need to be aware of this feature.

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to access local file from Spark sc.textFile("file:///path to/myfile")

2015-12-11 Thread Zhan Zhang
As Sean mentioned, you cannot referring to the local file in your remote 
machine (executors). One walk around is to copy the file to all machines within 
same directory.

Thanks.

Zhan Zhang

On Dec 11, 2015, at 10:26 AM, Lin, Hao 
<hao@finra.org<mailto:hao@finra.org>> wrote:

 of the master node



Re: DataFrames initial jdbc loading - will it be utilizing a filter predicate?

2015-11-18 Thread Zhan Zhang
When you have following query, 'account=== “acct1” will be pushdown to generate 
new query with “where account = acct1”

Thanks.

Zhan Zhang

On Nov 18, 2015, at 11:36 AM, Eran Medan 
<eran.me...@gmail.com<mailto:eran.me...@gmail.com>> wrote:

I understand that the following are equivalent

df.filter('account === "acct1")

sql("select * from tempTableName where account = 'acct1'")


But is Spark SQL "smart" to also push filter predicates down for the initial 
load?

e.g.
sqlContext.read.jdbc(…).filter('account=== "acct1")

Is Spark "smart enough" to this for each partition?

   ‘select … where account= ‘acc1’ AND (partition where clause here)?

Or do I have to put it on each partition where clause otherwise it will load 
the entire set and only then filter it in memory?

[https://mailfoogae.appspot.com/t?sender=aZWhyYW5uLm1laGRhbkBnbWFpbC5jb20%3D=zerocontent=4e81181c-98d1-4dac-b047-a4c9e7d864d9]ᐧ



Re: Spark Thrift doesn't start

2015-11-11 Thread Zhan Zhang
In the hive-site.xml, you can remove all configuration related to tez and give 
it a try again.

Thanks.

Zhan Zhang

On Nov 10, 2015, at 10:47 PM, DaeHyun Ryu 
<ry...@kr.ibm.com<mailto:ry...@kr.ibm.com>> wrote:

Hi folks,

I configured tez as execution engine of Hive. After done that, whenever I 
started spark thrift server, it just stopped automatically.
I checked log and saw the following messages. My spark version is 1.4.1 and   
tez version is 0.7.0 (IBM BigInsights 4.1)
Does anyone have any idea on this ?

java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:353)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:116)
at 
org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:163)
at 
org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:161)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:168)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
at $iwC$$iwC.(:9)
at $iwC.(:18)
at (:20)
at .(:24)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:130)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
at 
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)
at 
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:157)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
at 
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at 
org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubm

Re: Anybody hit this issue in spark shell?

2015-11-09 Thread Zhan Zhang
Thanks Ted. I am using latest master branch. I will try your build command and 
give it a try.

Thank.

Zhan Zhang

On Nov 9, 2015, at 10:46 AM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

Which branch did you perform the build with ?

I used the following command yesterday:
mvn -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4 -Dhadoop.version=2.7.0 
package -DskipTests

Spark shell was working.

Building with latest master branch.

On Mon, Nov 9, 2015 at 10:37 AM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:
Hi Folks,

Does anybody meet the following issue? I use "mvn package -Phive -DskipTests” 
to build the package.

Thanks.

Zhan Zhang



bin/spark-shell
...
Spark context available as sc.
error: error while loading QueryExecution, Missing dependency 'bad symbolic 
reference. A signature in QueryExecution.class refers to term annotations
in package com.google.common which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling 
QueryExecution.class.', required by 
/Users/zzhang/repo/spark/assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.2.0.jar(org/apache/spark/sql/execution/QueryExecution.class)
:10: error: not found: value sqlContext
   import sqlContext.implicits._
  ^
:10: error: not found: value sqlContext
   import sqlContext.sql
  ^




Anybody hit this issue in spark shell?

2015-11-09 Thread Zhan Zhang
Hi Folks,

Does anybody meet the following issue? I use "mvn package -Phive -DskipTests” 
to build the package.

Thanks.

Zhan Zhang



bin/spark-shell
...
Spark context available as sc.
error: error while loading QueryExecution, Missing dependency 'bad symbolic 
reference. A signature in QueryExecution.class refers to term annotations
in package com.google.common which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling 
QueryExecution.class.', required by 
/Users/zzhang/repo/spark/assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.2.0.jar(org/apache/spark/sql/execution/QueryExecution.class)
:10: error: not found: value sqlContext
   import sqlContext.implicits._
  ^
:10: error: not found: value sqlContext
   import sqlContext.sql
  ^


Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Zhan Zhang
Hi Jerry,

OK. Here is an ugly walk around.

Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a 
bunch of exceptions because hive context initialization failure, but you can 
initialize your SQLContext on your own.

scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@4a5cc2e8

scala> import sqlContext.implicits._
import sqlContext.implicits._


for example

HW11188:spark zzhang$ more conf/hive-site.xml


 

   

  hive.metastore.uris
thrift://zzhang-yarn11:9083

   

 
HW11188:spark zzhang$

By the way, I don’t know whether there is any caveat for this walk around.

Thanks.

Zhan Zhang





On Nov 6, 2015, at 2:40 PM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

Hi Zhan,

I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
sexier and much less typo. :)
Also, HiveContext requires metastore database setup (derby by default). The 
problem is that I cannot have 2 spark-shell sessions running at the same time 
in the same host (e.g. /home/jerry directory). It will give me an exception 
like below.

Since I don’t use HiveContext, I don’t see the need to maintain a database.

What is interesting is that pyspark shell is able to start more than 1 session 
at the same time. I wonder what pyspark has done better than spark-shell?

Best Regards,

Jerry

On Nov 6, 2015, at 5:28 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:

If you assembly jar have hive jar included, the HiveContext will be used. 
Typically, HiveContext has more functionality than SQLContext. In what case you 
have to use SQLContext that cannot be done by HiveContext?

Thanks.

Zhan Zhang

On Nov 6, 2015, at 10:43 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

On Nov 6, 2015, at 12:12 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

In SQLContext.scala :
// After we have populated SQLConf, we call setConf to populate other confs 
in the subclass
// (e.g. hiveconf in HiveContext).
properties.foreach {
  case (key, value) => setConf(key, value)
}

I don't see config of skipping the above call.

FYI

On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using 
spark-shell? I got the following errors when I have more than one session 
starts. Since I don't use HiveContext, it would be great if I can have more 
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.Delegati

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Zhan Zhang
I agree with minor change. Adding a config to provide the option to init 
SQLContext or HiveContext, with HiveContext as default instead of bypassing 
when hitting the Exception.

Thanks.

Zhan Zhang

On Nov 6, 2015, at 2:53 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

I would suggest adding a config parameter that allows bypassing initialization 
of HiveContext in case of SQLException

Cheers

On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:
Hi Jerry,

OK. Here is an ugly walk around.

Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a 
bunch of exceptions because hive context initialization failure, but you can 
initialize your SQLContext on your own.

scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@4a5cc2e8

scala> import sqlContext.implicits._
import sqlContext.implicits._


for example

HW11188:spark zzhang$ more conf/hive-site.xml


 

   

  hive.metastore.uris
thrift://zzhang-yarn11:9083

   

 
HW11188:spark zzhang$

By the way, I don’t know whether there is any caveat for this walk around.

Thanks.

Zhan Zhang





On Nov 6, 2015, at 2:40 PM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

Hi Zhan,

I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
sexier and much less typo. :)
Also, HiveContext requires metastore database setup (derby by default). The 
problem is that I cannot have 2 spark-shell sessions running at the same time 
in the same host (e.g. /home/jerry directory). It will give me an exception 
like below.

Since I don’t use HiveContext, I don’t see the need to maintain a database.

What is interesting is that pyspark shell is able to start more than 1 session 
at the same time. I wonder what pyspark has done better than spark-shell?

Best Regards,

Jerry

On Nov 6, 2015, at 5:28 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:

If you assembly jar have hive jar included, the HiveContext will be used. 
Typically, HiveContext has more functionality than SQLContext. In what case you 
have to use SQLContext that cannot be done by HiveContext?

Thanks.

Zhan Zhang

On Nov 6, 2015, at 10:43 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

On Nov 6, 2015, at 12:12 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

In SQLContext.scala :
// After we have populated SQLConf, we call setConf to populate other confs 
in the subclass
// (e.g. hiveconf in HiveContext).
properties.foreach {
  case (key, value) => setConf(key, value)
}

I don't see config of skipping the above call.

FYI

On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using 
spark-shell? I got the following errors when I have more than one session 
starts. Since I don't use HiveContext, it would be great if I can have more 
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.fo

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Zhan Zhang
If you assembly jar have hive jar included, the HiveContext will be used. 
Typically, HiveContext has more functionality than SQLContext. In what case you 
have to use SQLContext that cannot be done by HiveContext?

Thanks.

Zhan Zhang

On Nov 6, 2015, at 10:43 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

On Nov 6, 2015, at 12:12 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

In SQLContext.scala :
// After we have populated SQLConf, we call setConf to populate other confs 
in the subclass
// (e.g. hiveconf in HiveContext).
properties.foreach {
  case (key, value) => setConf(key, value)
}

I don't see config of skipping the above call.

FYI

On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using 
spark-shell? I got the following errors when I have more than one session 
starts. Since I don't use HiveContext, it would be great if I can have more 
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at 
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
at 
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
at 
org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154)
at 
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127)
at 
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
at 
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)

Best Regards,

Jerry





Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Zhan Zhang
Hi Jerry,

https://issues.apache.org/jira/browse/SPARK-11562 is created for the issue.

Thanks.

Zhan Zhang

On Nov 6, 2015, at 3:01 PM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

Hi Zhan,

Thank you for providing a workaround!
I will try this out but I agree with Ted, there should be a better way to 
capture the exception and handle it by just initializing SQLContext instead of 
HiveContext. WARN the user that something is wrong with his hive setup.

Having spark.sql.hive.enabled false configuration would be lovely too. :)
Just an additional bonus is that it requires less memory if we don’t use 
HiveContext on the driver side (~100-200MB) from a rough observation.

Thanks and have a nice weekend!

Jerry


On Nov 6, 2015, at 5:53 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

I would suggest adding a config parameter that allows bypassing initialization 
of HiveContext in case of SQLException

Cheers

On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:
Hi Jerry,

OK. Here is an ugly walk around.

Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get a 
bunch of exceptions because hive context initialization failure, but you can 
initialize your SQLContext on your own.

scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.SQLContext@4a5cc2e8

scala> import sqlContext.implicits._
import sqlContext.implicits._


for example

HW11188:spark zzhang$ more conf/hive-site.xml


 

   

  hive.metastore.uris
thrift://zzhang-yarn11:9083

   

 
HW11188:spark zzhang$

By the way, I don’t know whether there is any caveat for this walk around.

Thanks.

Zhan Zhang





On Nov 6, 2015, at 2:40 PM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

Hi Zhan,

I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
sexier and much less typo. :)
Also, HiveContext requires metastore database setup (derby by default). The 
problem is that I cannot have 2 spark-shell sessions running at the same time 
in the same host (e.g. /home/jerry directory). It will give me an exception 
like below.

Since I don’t use HiveContext, I don’t see the need to maintain a database.

What is interesting is that pyspark shell is able to start more than 1 session 
at the same time. I wonder what pyspark has done better than spark-shell?

Best Regards,

Jerry

On Nov 6, 2015, at 5:28 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:

If you assembly jar have hive jar included, the HiveContext will be used. 
Typically, HiveContext has more functionality than SQLContext. In what case you 
have to use SQLContext that cannot be done by HiveContext?

Thanks.

Zhan Zhang

On Nov 6, 2015, at 10:43 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:

What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

On Nov 6, 2015, at 12:12 PM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

In SQLContext.scala :
// After we have populated SQLConf, we call setConf to populate other confs 
in the subclass
// (e.g. hiveconf in HiveContext).
properties.foreach {
  case (key, value) => setConf(key, value)
}

I don't see config of skipping the above call.

FYI

On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam 
<chiling...@gmail.com<mailto:chiling...@gmail.com>> wrote:
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using 
spark-shell? I got the following errors when I have more than one session 
starts. Since I don't use HiveContext, it would be great if I can have more 
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClie

Re: Vague Spark SQL error message with saveAsParquetFile

2015-11-03 Thread Zhan Zhang
Looks like some JVM got killed or OOM. You can check the log to see the real 
causes.

Thanks.

Zhan Zhang

On Nov 3, 2015, at 9:23 AM, YaoPau 
<jonrgr...@gmail.com<mailto:jonrgr...@gmail.com>> wrote:

java.io.FileNotFoun



Re: Upgrade spark cluster to latest version

2015-11-03 Thread Zhan Zhang
Spark is a client library. You can just download the latest release or build on 
you own, and replace your existing one without changing you existing cluster.

Thanks.

Zhan Zhang

On Nov 3, 2015, at 3:58 PM, roni 
<roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>> wrote:

Hi  Spark experts,
  This may be a very naive question but can you pl. point me to a proper way to 
upgrade spark version on an existing cluster.
Thanks
Roni
Hi,
 I have a current cluster running spark 1.4 and want to upgrade to latest 
version.
 How can I do it without creating a new cluster  so that all my other setting 
getting erased.
Thanks
_R




Re: sql query orc slow

2015-10-13 Thread Zhan Zhang
Hi Patcharee,

I am not sure which side is wrong, driver or executor. If it is executor side, 
the reason you mentioned may be possible. But if the driver side didn’t set the 
predicate at all, then somewhere else is broken.

Can you please file a JIRA with a simple reproduce step, and let me know the 
JIRA number?

Thanks.

Zhan Zhang

On Oct 13, 2015, at 1:01 AM, Patcharee Thongtra 
<patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote:

Hi Zhan Zhang,

Is my problem (which is ORC predicate is not generated from WHERE clause even 
though spark.sql.orc.filterPushdown=true) can be related to some factors below ?

- orc file version (File Version: 0.12 with HIVE_8732)
- hive version (using Hive 1.2.1.2.3.0.0-2557)
- orc table is not sorted / indexed
- the split strategy hive.exec.orc.split.strategy

BR,
Patcharee


On 10/09/2015 08:01 PM, Zhan Zhang wrote:
That is weird. Unfortunately, there is no debug info available on this part. 
Can you please open a JIRA to add some debug information on the driver side?

Thanks.

Zhan Zhang

On Oct 9, 2015, at 10:22 AM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the 
log No ORC pushdown predicate for my query with WHERE clause.

15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate

I did not understand what wrong with this.

BR,
Patcharee

On 09. okt. 2015 19:10, Zhan Zhang wrote:
In your case, you manually set an AND pushdown, and the predicate is right 
based on your setting, : leaf-0 = (EQUALS x 320)

The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 
- v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y 
= 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is 
not partition column, the others are partition columns. I expected the system 
will use predicate pushdown. I turned on the debug and found pushdown predicate 
was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate")

Then I tried to set the search argument explicitly (on the column "x" which is 
not partition column)

   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()
   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was wrong (no 
results at all)

15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS 
x 320)
expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not applied 
by the system?

BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas w

Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee <patcharee.thong...@uni.no> wrote:

> Yes, the predicate pushdown is enabled, but still take longer time than the 
> first method
> 
> BR,
> Patcharee
> 
> On 08. okt. 2015 18:43, Zhan Zhang wrote:
>> Hi Patcharee,
>> 
>> Did you enable the predicate pushdown in the second method?
>> 
>> Thanks.
>> 
>> Zhan Zhang
>> 
>> On Oct 8, 2015, at 1:43 AM, patcharee <patcharee.thong...@uni.no> wrote:
>> 
>>> Hi,
>>> 
>>> I am using spark sql 1.5 to query a hive table stored as partitioned orc 
>>> file. We have the total files is about 6000 files and each file size is 
>>> about 245MB.
>>> 
>>> What is the difference between these two query methods below:
>>> 
>>> 1. Using query on hive table directly
>>> 
>>> hiveContext.sql("select col1, col2 from table1")
>>> 
>>> 2. Reading from orc file, register temp table and query from the temp table
>>> 
>>> val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
>>> c.registerTempTable("regTable")
>>> hiveContext.sql("select col1, col2 from regTable")
>>> 
>>> When the number of files is large (query all from the total 6000 files) , 
>>> the second case is much slower then the first one. Any ideas why?
>>> 
>>> BR,
>>> 
>>> 
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
In your case, you manually set an AND pushdown, and the predicate is right 
based on your setting, : leaf-0 = (EQUALS x 320)

The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee 
<patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote:

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 
- v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y 
= 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is 
not partition column, the others are partition columns. I expected the system 
will use predicate pushdown. I turned on the debug and found pushdown predicate 
was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate")

Then I tried to set the search argument explicitly (on the column "x" which is 
not partition column)

   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()
   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was wrong (no 
results at all)

15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS 
x 320)
expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not applied 
by the system?

BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee 
<patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote:

Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee 
<patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote:

Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas why?

BR,




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>








Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
That is weird. Unfortunately, there is no debug info available on this part. 
Can you please open a JIRA to add some debug information on the driver side?

Thanks.

Zhan Zhang

On Oct 9, 2015, at 10:22 AM, patcharee 
<patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>> wrote:

I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the 
log No ORC pushdown predicate for my query with WHERE clause.

15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate

I did not understand what wrong with this.

BR,
Patcharee

On 09. okt. 2015 19:10, Zhan Zhang wrote:
In your case, you manually set an AND pushdown, and the predicate is right 
based on your setting, : leaf-0 = (EQUALS x 320)

The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 
- v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y 
= 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is 
not partition column, the others are partition columns. I expected the system 
will use predicate pushdown. I turned on the debug and found pushdown predicate 
was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate")

Then I tried to set the search argument explicitly (on the column "x" which is 
not partition column)

   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()
   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was wrong (no 
results at all)

15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS 
x 320)
expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not applied 
by the system?

BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee 
<<mailto:patcharee.thong...@uni.no>patcharee.thong...@uni.no<mailto:patcharee.thong...@uni.no>>
 wrote:

Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas why?

BR,




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: <mailto:user-h...@spark.apache.org> 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>










Re: sql query orc slow

2015-10-08 Thread Zhan Zhang
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee <patcharee.thong...@uni.no> wrote:

> Hi,
> 
> I am using spark sql 1.5 to query a hive table stored as partitioned orc 
> file. We have the total files is about 6000 files and each file size is about 
> 245MB.
> 
> What is the difference between these two query methods below:
> 
> 1. Using query on hive table directly
> 
> hiveContext.sql("select col1, col2 from table1")
> 
> 2. Reading from orc file, register temp table and query from the temp table
> 
> val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
> c.registerTempTable("regTable")
> hiveContext.sql("select col1, col2 from regTable")
> 
> When the number of files is large (query all from the total 6000 files) , the 
> second case is much slower then the first one. Any ideas why?
> 
> BR,
> 
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang




Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
Hi Zhiliang,

I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine.
Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used 
to access hdfs, and the latter is used to launch application on top of yarn.

Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf.

Thanks.

Zhan Zhang


On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com<mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Yes, I get it now.
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...

Thank you,
Zhiliang

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:


There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks.

Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com<mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,
Zhiliang





On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang










Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks.

Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com<mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,
Zhiliang





On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
<zchl.j...@yahoo.com.INVALID<mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang







Re: HDP 2.3 support for Spark 1.5.x

2015-09-22 Thread Zhan Zhang
Hi Krishna,

For the time being, you can download from upstream, and it should be running OK 
for HDP2.3.  For hdp specific problem, you can ask in Hortonworks forum.

Thanks.

Zhan Zhang

On Sep 22, 2015, at 3:42 PM, Krishna Sankar 
<ksanka...@gmail.com<mailto:ksanka...@gmail.com>> wrote:

Guys,

  *   We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The 
current wisdom is that it will support the 1.4.x train (which is good, need 
DataFrame et al).
  *   What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on HDP 2.3 
? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ?

Cheers & Thanks




Re: PrunedFilteredScan does not work for UDTs and Struct fields

2015-09-19 Thread Zhan Zhang
Hi Richard,


I am not sure how to support user-defined type. But regarding your second 
question, you can have a walkaround as following.


Suppose you have a struct a, and want to filter a.c with a.c > X. You can 
define a alias C as a.c, and add extra column C to the schema of the relation, 
and your query would be C > X instead of a.c > X. In this way, in the buildScan 
you would have GreaterThan(C, X). You then can programmatically convert C to 
a.c. Note that in the buildScan required columns would also have an extra 
column C you need to returned in the buildScan RDD.


It looks complicated, but I think it would work.


Thanks.


Zhan Zhang


From: Richard Eggert <richard.egg...@gmail.com>
Sent: Saturday, September 19, 2015 3:59 PM
To: User
Subject: PrunedFilteredScan does not work for UDTs and Struct fields

I defined my own relation (extending BaseRelation) and implemented the 
PrunedFilteredScan interface, but discovered that if the column referenced in a 
WHERE = clause is a user-defined type or a field of a struct column, then Spark 
SQL passes NO filters to the PrunedFilteredScan.buildScan method, rendering the 
interface useless. Is there really no way to implement a relation to optimize 
on such fields?

--
Rich


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Zhan Zhang
If you are using spark-1.4.0, probably it is caused by 
SPARK-8458https://issues.apache.org/jira/browse/SPARK-8458

Thanks.

Zhan Zhang

On Aug 23, 2015, at 12:49 PM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:

Ted,
  Thanks for the suggestions. Actually I tried both s3n and s3 and the result 
remains the same.


On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
In your case, I would specify fs.s3.awsAccessKeyId / 
fs.s3.awsSecretAccessKey since you use s3 protocol.

On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:
Hi Ted,
  Thanks for the reply. I tried setting both of the keyid and accesskey via

sc.hadoopConfiguration.set(fs.s3n.awsAccessKeyId, ***)
sc.hadoopConfiguration.set(fs.s3n.awsSecretAccessKey, **)

However, the error still occurs for ORC format.

If I change the format to JSON, although the error does not go, the JSON files 
can be saved successfully.




On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
You may have seen this:
http://search-hadoop.com/m/q3RTtdSyM52urAyI



On Aug 23, 2015, at 1:01 AM, lostrain A 
donotlikeworkingh...@gmail.commailto:donotlikeworkingh...@gmail.com wrote:

Hi,
  I'm trying to save a simple dataframe to S3 in ORC format. The code is as 
follows:


 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  import sqlContext.implicits._
  val df=sc.parallelize(1 to 1000).toDF()
  df.write.format(orc).save(s3://logs/dummy)

I ran the above code in spark-shell and only the _SUCCESS file was saved under 
the directory.
The last part of the spark-shell log said:

15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 
in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)

15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 
(save at console:29) finished in 0.834 s

15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, 
whose tasks have all completed, from pool

15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at console:29, 
took 0.895912 s

15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: 
Returning directory: /media/ephemeral0/s3/output-

15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is 
[-44, 29, -128, -39, -113, 0, -78,
 4, -23, -103, 9, -104, -20, -8, 66, 126]

15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed.

Anyone has experienced this before?
Thanks!







Re: Authentication Support with spark-submit cluster mode

2015-07-29 Thread Zhan Zhang
If you run it on yarn with kerberos setup. You authenticate yourself by kinit 
before launching the job.

Thanks.

Zhan Zhang

On Jul 28, 2015, at 8:51 PM, Anh Hong 
hongnhat...@yahoo.com.INVALIDmailto:hongnhat...@yahoo.com.INVALID wrote:

Hi,
I'd like to remotely run spark-submit from a local machine to submit a job to 
spark cluster (cluster mode).
What method do I use to authenticate myself to the cluster? Like how to pass 
user id or password or private key to the cluster

Any help is appreciated.





Re: [SPAM] Customized Aggregation Query on Spark SQL

2015-04-30 Thread Zhan Zhang
One optimization is to reduce the shuffle by first aggregate locally (only keep 
the max for each name), and then reduceByKey.

Thanks.

Zhan Zhang

On Apr 24, 2015, at 10:03 PM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:

Here you go

t = 
[[A,10,A10],[A,20,A20],[A,30,A30],[B,15,B15],[C,10,C10],[C,20,C200]]
TRDD = sc.parallelize(t).map(lambda t: 
Row(name=str(t[0]),age=int(t[1]),other=str(t[2])))
TDF = ssc.createDataFrame(TRDD)
print TDF.printSchema()
TDF.registerTempTable(tab)
JN = ssc.sql(select t.namehttp://t.name/,t.age,t.other from tab t inner 
join (select name,max(age) age from tab group by name) t1 on 
t.namehttp://t.name/=t1.namehttp://t1.name/ and t.age=t1.age)
for i in JN.collect():
print i

Result:
Row(name=u'A', age=30, other=u'A30')
Row(name=u'B', age=15, other=u'B15')
Row(name=u'C', age=20, other=u'C200')

On Sat, Apr 25, 2015 at 2:48 PM, Wenlei Xie 
wenlei@gmail.commailto:wenlei@gmail.com wrote:
Sure. A simple example of data would be (there might be many other columns)

Name AgeOther
A   10A10
A20   A20
A30   A30
B15   B15
C10C10
C20   C20

The desired output would be
Name  AgeOther
A 30   A30
B 15   B15
C 20   C20

Thank you so much for the help!

On Sat, Apr 25, 2015 at 12:41 AM, ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com wrote:
can you give an example set of data and desired output

On Sat, Apr 25, 2015 at 2:32 PM, Wenlei Xie 
wenlei@gmail.commailto:wenlei@gmail.com wrote:
Hi,

I would like to answer the following customized aggregation query on Spark SQL
1. Group the table by the value of Name
2. For each group, choose the tuple with the max value of Age (the ages are 
distinct for every name)

I am wondering what's the best way to do it on Spark SQL? Should I use UDAF? 
Previously I am doing something like the following on Spark:

personRDD.map(t = (t.namehttp://t.name/, t))
.reduceByKey((a, b) = if (a.age  b.age) a else b)

Thank you!

Best,
Wenlei



--
Best Regards,
Ayan Guha



--
Wenlei Xie (谢文磊)

Ph.D. Candidate
Department of Computer Science
456 Gates Hall, Cornell University
Ithaca, NY 14853, USA
Email: wenlei@gmail.commailto:wenlei@gmail.com



--
Best Regards,
Ayan Guha



Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-04-17 Thread Zhan Zhang
Hi Udit,

By the way, do you mind to share the whole log trace?

Thanks.

Zhan Zhang

On Apr 17, 2015, at 2:26 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I am just trying to launch a spark shell and not do anything fancy. I got the 
binary distribution from apache and put the spark assembly on hdfs. I then 
specified the yarn.jars option in spark defaults to point to the assembly in 
hdfs. I still got the same error so though I had to build it for hdp. I am 
using hdp 2.2 with hadoop 2.6/

On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:
Thanks. Would that distribution work for hdp 2.2?

On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will 
include everything. It looks like your package does not include yarn module, 
although I didn’t find anything wrong in your mvn command. Can you check 
whether the ExecutorLauncher class is in your jar file or not?

BTW: For spark-1.3, you can use the binary distribution from apache.

Thanks.

Zhan Zhang



On Apr 17, 2015, at 2:01 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I followed the steps described above and I still get this error:

Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher


I am trying to build spark 1.3 on hdp 2.2.
I built spark from source using:
build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-DskipTests package

Maybe I am not putting the correct yarn assembly on hdfs or some other issue?

Thanks,
Udit

On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

Just to summarize it to run SPARK on HDP distribution.

1. The spark version has to be 1.3.0 and above if you are using upstream 
distribution.  This configuration is mainly for HDP rolling upgrade purpose, 
and the patch only went into spark upstream from 1.3.0.

2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings.
spark.driver.extraJavaOptions -Dhdp.version=x

   spark.yarn.am.extraJavaOptions -Dhdp.version=x

3. In $SPARK_HOME/java-opts, add following options.
   -Dhdp.version=x

Thanks.

Zhan Zhang



On Mar 30, 2015, at 6:56 AM, Doug Balog 
doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote:

The “best” solution to spark-shell’s  problem is creating a file 
$SPARK_HOME/conf/java-opts
with “-Dhdp.version=2.2.0.0-2014”

Cheers,

Doug

On Mar 28, 2015, at 1:25 PM, Michael Stone 
mst...@mathom.usmailto:mst...@mathom.us wrote:

I've also been having trouble running 1.3.0 on HDP. The 
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
configuration directive seems to work with pyspark, but not propagate when 
using spark-shell. (That is, everything works find with pyspark, and 
spark-shell fails with the bad substitution message.)

Mike Stone

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org









Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-04-17 Thread Zhan Zhang
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will 
include everything. It looks like your package does not include yarn module, 
although I didn’t find anything wrong in your mvn command. Can you check 
whether the ExecutorLauncher class is in your jar file or not?

BTW: For spark-1.3, you can use the binary distribution from apache.

Thanks.

Zhan Zhang



On Apr 17, 2015, at 2:01 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I followed the steps described above and I still get this error:

Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher


I am trying to build spark 1.3 on hdp 2.2.
I built spark from source using:
build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-DskipTests package

Maybe I am not putting the correct yarn assembly on hdfs or some other issue?

Thanks,
Udit

On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

Just to summarize it to run SPARK on HDP distribution.

1. The spark version has to be 1.3.0 and above if you are using upstream 
distribution.  This configuration is mainly for HDP rolling upgrade purpose, 
and the patch only went into spark upstream from 1.3.0.

2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings.
spark.driver.extraJavaOptions -Dhdp.version=x

   spark.yarn.am.extraJavaOptions -Dhdp.version=x

3. In $SPARK_HOME/java-opts, add following options.
   -Dhdp.version=x

Thanks.

Zhan Zhang



On Mar 30, 2015, at 6:56 AM, Doug Balog 
doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote:

The “best” solution to spark-shell’s  problem is creating a file 
$SPARK_HOME/conf/java-opts
with “-Dhdp.version=2.2.0.0-2014”

Cheers,

Doug

On Mar 28, 2015, at 1:25 PM, Michael Stone 
mst...@mathom.usmailto:mst...@mathom.us wrote:

I've also been having trouble running 1.3.0 on HDP. The 
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
configuration directive seems to work with pyspark, but not propagate when 
using spark-shell. (That is, everything works find with pyspark, and 
spark-shell fails with the bad substitution message.)

Mike Stone

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org






Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-04-17 Thread Zhan Zhang
You probably want to first try the basic configuration to see whether it works, 
instead of setting SPARK_JAR pointing to the hdfs location.  This error is 
caused by not finding ExecutorLauncher in class path, and not HDP specific, I 
think.

Thanks.

Zhan Zhang

On Apr 17, 2015, at 2:26 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I am just trying to launch a spark shell and not do anything fancy. I got the 
binary distribution from apache and put the spark assembly on hdfs. I then 
specified the yarn.jars option in spark defaults to point to the assembly in 
hdfs. I still got the same error so though I had to build it for hdp. I am 
using hdp 2.2 with hadoop 2.6/

On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:
Thanks. Would that distribution work for hdp 2.2?

On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will 
include everything. It looks like your package does not include yarn module, 
although I didn’t find anything wrong in your mvn command. Can you check 
whether the ExecutorLauncher class is in your jar file or not?

BTW: For spark-1.3, you can use the binary distribution from apache.

Thanks.

Zhan Zhang



On Apr 17, 2015, at 2:01 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I followed the steps described above and I still get this error:

Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher


I am trying to build spark 1.3 on hdp 2.2.
I built spark from source using:
build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-DskipTests package

Maybe I am not putting the correct yarn assembly on hdfs or some other issue?

Thanks,
Udit

On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

Just to summarize it to run SPARK on HDP distribution.

1. The spark version has to be 1.3.0 and above if you are using upstream 
distribution.  This configuration is mainly for HDP rolling upgrade purpose, 
and the patch only went into spark upstream from 1.3.0.

2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings.
spark.driver.extraJavaOptions -Dhdp.version=x

   spark.yarn.am.extraJavaOptions -Dhdp.version=x

3. In $SPARK_HOME/java-opts, add following options.
   -Dhdp.version=x

Thanks.

Zhan Zhang



On Mar 30, 2015, at 6:56 AM, Doug Balog 
doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote:

The “best” solution to spark-shell’s  problem is creating a file 
$SPARK_HOME/conf/java-opts
with “-Dhdp.version=2.2.0.0-2014”

Cheers,

Doug

On Mar 28, 2015, at 1:25 PM, Michael Stone 
mst...@mathom.usmailto:mst...@mathom.us wrote:

I've also been having trouble running 1.3.0 on HDP. The 
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
configuration directive seems to work with pyspark, but not propagate when 
using spark-shell. (That is, everything works find with pyspark, and 
spark-shell fails with the bad substitution message.)

Mike Stone

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org









Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-04-17 Thread Zhan Zhang
Besides the hdp.version in spark-defaults.conf, I think you probably forget to 
put the file java-opts under $SPARK_HOME/conf with following contents.

[root@c6402 conf]# pwd
/usr/hdp/current/spark-client/conf
[root@c6402 conf]# ls
fairscheduler.xml.template  java-opts log4j.properties.template  
metrics.properties.template  spark-defaults.conf   spark-env.sh
hive-site.xml   log4j.properties  metrics.properties 
slaves.template  spark-defaults.conf.template  spark-env.sh.template
[root@c6402 conf]# more java-opts
  -Dhdp.version=2.2.0.0-2041
[root@c6402 conf]#


Thanks.

Zhan Zhang


On Apr 17, 2015, at 3:09 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

Hi,

This is the log trace:
https://gist.github.com/uditmehta27/511eac0b76e6d61f8b47

On the yarn RM UI, I see :

Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher


The command I run is: bin/spark-shell --master yarn-client

The spark defaults I use is:
spark.yarn.jar 
hdfs://namenode1-dev.snc1:8020/spark/spark-assembly-1.3.0-hadoop2.4.0.jar
spark.yarn.access.namenodes hdfs://namenode1-dev.snc1:8032
spark.dynamicAllocation.enabled false
spark.scheduler.mode FAIR
spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041

Is there anything wrong in what I am trying to do?

thanks again!


On Fri, Apr 17, 2015 at 2:56 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Udit,

By the way, do you mind to share the whole log trace?

Thanks.

Zhan Zhang

On Apr 17, 2015, at 2:26 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I am just trying to launch a spark shell and not do anything fancy. I got the 
binary distribution from apache and put the spark assembly on hdfs. I then 
specified the yarn.jars option in spark defaults to point to the assembly in 
hdfs. I still got the same error so though I had to build it for hdp. I am 
using hdp 2.2 with hadoop 2.6/

On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:
Thanks. Would that distribution work for hdp 2.2?

On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will 
include everything. It looks like your package does not include yarn module, 
although I didn’t find anything wrong in your mvn command. Can you check 
whether the ExecutorLauncher class is in your jar file or not?

BTW: For spark-1.3, you can use the binary distribution from apache.

Thanks.

Zhan Zhang



On Apr 17, 2015, at 2:01 PM, Udit Mehta 
ume...@groupon.commailto:ume...@groupon.com wrote:

I followed the steps described above and I still get this error:

Error: Could not find or load main class 
org.apache.spark.deploy.yarn.ExecutorLauncher


I am trying to build spark 1.3 on hdp 2.2.
I built spark from source using:
build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
-DskipTests package

Maybe I am not putting the correct yarn assembly on hdfs or some other issue?

Thanks,
Udit

On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

Just to summarize it to run SPARK on HDP distribution.

1. The spark version has to be 1.3.0 and above if you are using upstream 
distribution.  This configuration is mainly for HDP rolling upgrade purpose, 
and the patch only went into spark upstream from 1.3.0.

2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings.
spark.driver.extraJavaOptions -Dhdp.version=x

   spark.yarn.am.extraJavaOptions -Dhdp.version=x

3. In $SPARK_HOME/java-opts, add following options.
   -Dhdp.version=x

Thanks.

Zhan Zhang



On Mar 30, 2015, at 6:56 AM, Doug Balog 
doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote:

The “best” solution to spark-shell’s  problem is creating a file 
$SPARK_HOME/conf/java-opts
with “-Dhdp.version=2.2.0.0-2014”

Cheers,

Doug

On Mar 28, 2015, at 1:25 PM, Michael Stone 
mst...@mathom.usmailto:mst...@mathom.us wrote:

I've also been having trouble running 1.3.0 on HDP. The 
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
configuration directive seems to work with pyspark, but not propagate when 
using spark-shell. (That is, everything works find with pyspark, and 
spark-shell fails with the bad substitution message.)

Mike Stone

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e

Re: Spark 1.3.0: Running Pi example on YARN fails

2015-04-13 Thread Zhan Zhang
Hi Zork,

From the exception, it is still caused by hdp.version not being propagated 
correctly.  Can you check whether there is any typo?

[root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0–2041

[root@c6402 conf]# more spark-defaults.conf
spark.driver.extraJavaOptions  -Dhdp.version=2.2.0.0–2041
spark.yarn.am.extraJavaOptions  -Dhdp.version=2.2.0.0–2041

This is HDP specific question, and you can move the topic to HDP forum.


Thanks.

Zhan Zhang


On Apr 13, 2015, at 3:00 AM, Zork Sail 
zorks...@gmail.commailto:zorks...@gmail.com wrote:

Hi Zhan,
Alas setting:

-Dhdp.version=2.2.0.0–2041

Does not help. Still get the same error:
15/04/13 09:53:59 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: UNDEFINED
 tracking URL: 
http://foo.bar.site:8088/proxy/application_1427875242006_0037/
 user: test
15/04/13 09:54:00 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:01 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:02 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:03 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: FAILED)
15/04/13 09:54:03 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1427875242006_0037 failed 2 times due 
to AM Container for appattempt_1427875242006_0037_02 exited with  exitCode: 
1
For more detailed output, check application tracking 
page:http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click 
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1427875242006_0037_02_01
Exit code: 1
Exception message: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

Stack trace: ExitCodeException exitCode=1: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: FAILED
 tracking URL: 
http://foo.bar.site:8088/cluster/app

Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class

2015-03-30 Thread Zhan Zhang
Hi Folks,

Just to summarize it to run SPARK on HDP distribution.

1. The spark version has to be 1.3.0 and above if you are using upstream 
distribution.  This configuration is mainly for HDP rolling upgrade purpose, 
and the patch only went into spark upstream from 1.3.0.

2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings.
spark.driver.extraJavaOptions -Dhdp.version=x

   spark.yarn.am.extraJavaOptions -Dhdp.version=x

3. In $SPARK_HOME/java-opts, add following options.
   -Dhdp.version=x

Thanks.

Zhan Zhang



On Mar 30, 2015, at 6:56 AM, Doug Balog 
doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote:

The “best” solution to spark-shell’s  problem is creating a file 
$SPARK_HOME/conf/java-opts
with “-Dhdp.version=2.2.0.0-2014”

Cheers,

Doug

On Mar 28, 2015, at 1:25 PM, Michael Stone 
mst...@mathom.usmailto:mst...@mathom.us wrote:

I've also been having trouble running 1.3.0 on HDP. The 
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041
configuration directive seems to work with pyspark, but not propagate when 
using spark-shell. (That is, everything works find with pyspark, and 
spark-shell fails with the bad substitution message.)

Mike Stone

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: 2 input paths generate 3 partitions

2015-03-27 Thread Zhan Zhang
Hi Rares,

The number of partition is controlled by HDFS input format, and one file may 
have multiple partitions if it consists of multiple block. In you case, I think 
there is one file with 2 splits.

Thanks.

Zhan Zhang
On Mar 27, 2015, at 3:12 PM, Rares Vernica 
rvern...@gmail.commailto:rvern...@gmail.com wrote:

Hello,

I am using the Spark shell in Scala on the localhost. I am using sc.textFile to 
read a directory. The directory looks like this (generated by another Spark 
script):

part-0
part-1
_SUCCESS

The part-0 has four short lines of text while part-1 has two short 
lines of text. The _SUCCESS file is empty. When I check the number of 
partitions on the RDD I get:

scala foo.partitions.length
15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2
res68: Int = 3

I wonder why do the two input files generate three partitions. Does Spark check 
the number of lines in each file and try to generate three balanced partitions?

Thanks!
Rares



Re: Can't access file in spark, but can in hadoop

2015-03-27 Thread Zhan Zhang
Probably guava version conflicts issue. What spark version did you use, and 
which hadoop version it compile against?

Thanks.

Zhan Zhang

On Mar 27, 2015, at 12:13 PM, Johnson, Dale 
daljohn...@ebay.commailto:daljohn...@ebay.com wrote:

Yes, I could recompile the hdfs client with more logging, but I don’t have the 
day or two to spare right this week.

One more thing about this, the cluster is Horton Works 2.1.3 [.0]

They seem to have a claim of supporting spark on Horton Works 2.2

Dale.

From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com
Date: Thursday, March 26, 2015 at 4:54 PM
To: Johnson, Dale daljohn...@ebay.commailto:daljohn...@ebay.com
Cc: user user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Can't access file in spark, but can in hadoop

Looks like the following assertion failed:
  Preconditions.checkState(storageIDsCount == locs.size());

locs is ListDatanodeInfoProto
Can you enhance the assertion to log more information ?

Cheers

On Thu, Mar 26, 2015 at 3:06 PM, Dale Johnson 
daljohn...@ebay.commailto:daljohn...@ebay.com wrote:
There seems to be a special kind of corrupted according to Spark state of
file in HDFS.  I have isolated a set of files (maybe 1% of all files I need
to work with) which are producing the following stack dump when I try to
sc.textFile() open them.  When I try to open directories, most large
directories contain at least one file of this type.  Curiously, the
following two lines fail inside of a Spark job, but not inside of a Scoobi
job:

val conf = new org.apache.hadoop.conf.Configuration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)

The stack trace follows:

15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: null)
Exception in thread Driver java.lang.IllegalStateException
at
org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133)
at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673)
at
org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.init(DistributedFileSystem.java:738)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727)
at 
org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662)
at org.apache.hadoop.fs.FileSystem$5.init(FileSystem.java:1724)
at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123)
at
com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from
shutdown hook

It appears to have found the three copies of the given HDFS block, but is
performing some sort of validation with them before giving them back to
spark to schedule the job.  But there is an assert failing.

I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error,
but I've seen the line numbers change on the HDFS libraries, but not the
function names.  I've tried recompiling myself with different hadoop
versions, and it's the same.  We're running hadoop 2.4.1 on our cluster.

A google search turns up absolutely nothing on this.

Any insight at all would be appreciated.

Dale Johnson
Applied Researcher
eBay.comhttp://eBay.com




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html
Sent from the Apache Spark User List

RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks Jonathan. You are right regarding rewrite the example.

I mean providing such option to developer so that it is controllable. The 
example may seems silly, and I don’t know the use cases.

But for example, if I also want to operate both the key and value part to 
generate some new value with keeping key part untouched. Then mapValues may not 
be able to  do this.

Changing the code to allow this is trivial, but I don’t know whether there is 
some special reason behind this.

Thanks.

Zhan Zhang



On Mar 26, 2015, at 2:49 PM, Jonathan Coveney 
jcove...@gmail.commailto:jcove...@gmail.com wrote:

I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at console:23 []
 |  MapPartitionsRDD[33] at mapValues at console:23 []
 |  ShuffledRDD[32] at reduceByKey at console:23 []
 +-(8) MapPartitionsRDD[31] at map at console:23 []
|  ParallelCollectionRDD[30] at parallelize at console:23 []

The difference is that spark has no way to know that your map closure doesn't 
change the key. if you only use mapValues, it does. Pretty cool that they 
optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com:
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in 
RDD.map? Do I miss something here?

Following example involves two shuffles. I think if preservePartitioning is 
allowed, we can avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []

Thanks.

Zhan Zhang

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks all for the quick response.

Thanks.

Zhan Zhang

On Mar 26, 2015, at 3:14 PM, Patrick Wendell pwend...@gmail.com wrote:

 I think we have a version of mapPartitions that allows you to tell
 Spark the partitioning is preserved:
 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639
 
 We could also add a map function that does same. Or you can just write
 your map using an iterator.
 
 - Patrick
 
 On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney jcove...@gmail.com wrote:
 This is just a deficiency of the api, imo. I agree: mapValues could
 definitely be a function (K, V)=V1. The option isn't set by the function,
 it's on the RDD. So you could look at the code and do this.
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
 
 def mapValues[U](f: V = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
  preservesPartitioning = true)
  }
 
 What you want:
 
 def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t)) },
  preservesPartitioning = true)
  }
 
 One of the nice things about spark is that making such new operators is very
 easy :)
 
 2015-03-26 17:54 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:
 
 Thanks Jonathan. You are right regarding rewrite the example.
 
 I mean providing such option to developer so that it is controllable. The
 example may seems silly, and I don't know the use cases.
 
 But for example, if I also want to operate both the key and value part to
 generate some new value with keeping key part untouched. Then mapValues may
 not be able to  do this.
 
 Changing the code to allow this is trivial, but I don't know whether there
 is some special reason behind this.
 
 Thanks.
 
 Zhan Zhang
 
 
 
 
 On Mar 26, 2015, at 2:49 PM, Jonathan Coveney jcove...@gmail.com wrote:
 
 I believe if you do the following:
 
 
 sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
 
 (8) MapPartitionsRDD[34] at reduceByKey at console:23 []
 |  MapPartitionsRDD[33] at mapValues at console:23 []
 |  ShuffledRDD[32] at reduceByKey at console:23 []
 +-(8) MapPartitionsRDD[31] at map at console:23 []
|  ParallelCollectionRDD[30] at parallelize at console:23 []
 
 The difference is that spark has no way to know that your map closure
 doesn't change the key. if you only use mapValues, it does. Pretty cool that
 they optimized that :)
 
 2015-03-26 17:44 GMT-04:00 Zhan Zhang zzh...@hortonworks.com:
 
 Hi Folks,
 
 Does anybody know what is the reason not allowing preserverPartitioning
 in RDD.map? Do I miss something here?
 
 Following example involves two shuffles. I think if preservePartitioning
 is allowed, we can avoid the second one, right?
 
 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)
 
 scala r5.toDebugString
 res2: String =
 (8) ShuffledRDD[4] at reduceByKey at console:29 []
 +-(8) MapPartitionsRDD[3] at map at console:27 []
|  ShuffledRDD[2] at reduceByKey at console:25 []
+-(8) MapPartitionsRDD[1] at map at console:23 []
   |  ParallelCollectionRDD[0] at parallelize at console:21 []
 
 Thanks.
 
 Zhan Zhang
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM for HiveFromSpark example

2015-03-25 Thread Zhan Zhang
I solve this by  increase the PermGen memory size in driver.

-XX:MaxPermSize=512m

Thanks.

Zhan Zhang

On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:

I am facing same issue, posted a new thread. Please respond.

On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

I am trying to run hive context in yarn-cluster mode, but met some error. Does 
anybody know what cause the issue.

I use following cmd to build the distribution:

 ./make-distribution.sh -Phive -Phive-thriftserver  -Pyarn  -Phadoop-2.4

15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: 
YarnClusterScheduler.postStartHook done
15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block 
manager 
cn122-10.l42scl.hortonworks.com:56157http://cn122-10.l42scl.hortonworks.com:56157/
 with 1589.8 MB RAM, BlockManagerId(2, 
cn122-10.l42scl.hortonworks.comhttp://cn122-10.l42scl.hortonworks.com/, 56157)
15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed
15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with 
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
datanucleus.cache.level2 unknown - will be ignored
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin 
classes with 
hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, 
assuming we are not on mysql: Lexical error at line 1, column 5.  Encountered: 
@ (64), after : .
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore
15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 0.13.1aa
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, 
since config is empty
15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this 
point. hive.execution.engine=mr.
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=Driver.run 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=TimeToSubmit 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a 
lock manager
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=compile 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=parse 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed
15/01/13 18:00:03 INFO log.PerfLogger: /PERFLOG method=parse 
start=1421190003030 end=1421190003031 duration=1 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=semanticAnalyze 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Creating table src position=27
15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_table : db=default 
tbl=src
15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang  ip=unknown-ip-addr  
cmd=get_table : db=default tbl=src
15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_database: default
15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang  ip=unknown-ip-addr

Re: OOM for HiveFromSpark example

2015-03-25 Thread Zhan Zhang
You can do it in $SPARK_HOME/conf/spark-defaults.con

spark.driver.extraJavaOptions -XX:MaxPermSize=512m

Thanks.

Zhan Zhang


On Mar 25, 2015, at 7:25 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:

Where and how do i pass this or other JVM argument ?
-XX:MaxPermSize=512m

On Wed, Mar 25, 2015 at 11:36 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
I solve this by  increase the PermGen memory size in driver.

-XX:MaxPermSize=512m

Thanks.

Zhan Zhang

On Mar 25, 2015, at 10:54 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:

I am facing same issue, posted a new thread. Please respond.

On Wed, Jan 14, 2015 at 4:38 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Hi Folks,

I am trying to run hive context in yarn-cluster mode, but met some error. Does 
anybody know what cause the issue.

I use following cmd to build the distribution:

 ./make-distribution.sh -Phive -Phive-thriftserver  -Pyarn  -Phadoop-2.4

15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: 
YarnClusterScheduler.postStartHook done
15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block 
manager 
cn122-10.l42scl.hortonworks.com:56157http://cn122-10.l42scl.hortonworks.com:56157/
 with 1589.8 MB RAM, BlockManagerId(2, 
cn122-10.l42scl.hortonworks.comhttp://cn122-10.l42scl.hortonworks.com/, 56157)
15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed
15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with 
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
datanucleus.cache.level2 unknown - will be ignored
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin 
classes with 
hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, 
assuming we are not on mysql: Lexical error at line 1, column 5.  Encountered: 
@ (64), after : .
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore
15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 0.13.1aa
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, 
since config is empty
15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this 
point. hive.execution.engine=mr.
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=Driver.run 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=TimeToSubmit 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a 
lock manager
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=compile 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=parse 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed
15/01/13 18:00:03 INFO log.PerfLogger: /PERFLOG method=parse 
start=1421190003030 end=1421190003031 duration=1 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=semanticAnalyze 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
15/01/13 18:00:03 INFO

Re: Spark-thriftserver Issue

2015-03-24 Thread Zhan Zhang
You can try to set it in spark-env.sh.

# - SPARK_LOG_DIR   Where log files are stored.  (Default: 
${SPARK_HOME}/logs)
# - SPARK_PID_DIR   Where the pid file is stored. (Default: /tmp)

Thanks.

Zhan Zhang

On Mar 24, 2015, at 12:10 PM, Anubhav Agarwal 
anubha...@gmail.commailto:anubha...@gmail.com wrote:

Zhan specifying port fixed the port issue.

Is it possible to specify the log directory while starting the spark 
thriftserver?
Still getting this error even through the folder exists and everyone has 
permission to use that directory.
drwxr-xr-x  2 root root  4096 Mar 24 19:04 spark-events


Exception in thread main java.lang.IllegalArgumentException: Log directory 
/tmp/spark-events does not exist.
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99)
at org.apache.spark.SparkContext.init(SparkContext.scala:399)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
neilk...@gmail.commailto:neilk...@gmail.com wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread main java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1http://0.0.0.0:1/.
   at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil





Re: Spark-thriftserver Issue

2015-03-23 Thread Zhan Zhang
Probably the port is already used by others, e.g., hive. You can change the 
port similar to below


 ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf 
hive.server2.thrift.port=10001

Thanks.

Zhan Zhang

On Mar 23, 2015, at 12:01 PM, Neil Dev 
neilk...@gmail.commailto:neilk...@gmail.com wrote:

Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread main java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
   at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:93)
   at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:79)
   at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
   at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
   at java.lang.Thread.run(Thread.java:745)

Thanks
Neil



Re: Spark Job History Server

2015-03-20 Thread Zhan Zhang
Hi Patcharee,

It is an alpha feature in HDP distribution, integrating ATS with Spark history 
server. If you are using upstream, you can configure spark as regular without 
these configuration. But other related configuration are still mandatory, such 
as hdp.version related.

Thanks.

Zhan Zhang
 
On Mar 18, 2015, at 3:30 AM, patcharee patcharee.thong...@uni.no wrote:

 Hi,
 
 I am using spark 1.3. I would like to use Spark Job History Server. I added 
 the following line into conf/spark-defaults.conf
 
 spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
 spark.history.provider 
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider
 spark.yarn.historyServer.address  sandbox.hortonworks.com:19888
 
 But got Exception in thread main java.lang.ClassNotFoundException: 
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider
 
 What class is really needed? How to fix it?
 
 Br,
 Patcharee
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving Dstream into a single file

2015-03-16 Thread Zhan Zhang
Each RDD has multiple partitions, each of them will produce one hdfs file when 
saving output. I don’t think you are allowed to have multiple file handler 
writing to the same hdfs file.  You still can load multiple files into hive 
tables, right?

Thanks..

Zhan Zhang

On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com wrote:

 i am doing word count example on flume stream and trying to save output as
 text files in HDFS , but in the save directory i got multiple sub
 directories each having files with small size , i wonder if there is a way
 to append in a large file instead of saving in multiple files , as i intend
 to save the output in hive hdfs directory so i can query the result using
 hive 
 
 hope anyone have a workaround for this issue , Thanks in advance 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LogisticRegressionWithLBFGS shows ERRORs

2015-03-13 Thread Zhan Zhang
It is during function evaluation in the line search, the value is either 
infinite or NaN, which may be caused too large step size. In the code, the step 
is reduced to half.

Thanks.

Zhan Zhang

On Mar 13, 2015, at 2:41 PM, cjwang c...@cjwang.us wrote:

 I am running LogisticRegressionWithLBFGS.  I got these lines on my console:
 
 2015-03-12 17:38:03,897 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to 0.5
 2015-03-12 17:38:03,967 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to 0.25
 2015-03-12 17:38:04,036 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to 0.125
 2015-03-12 17:38:04,105 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to
 0.0625
 2015-03-12 17:38:04,176 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to
 0.03125
 2015-03-12 17:38:04,247 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to
 0.015625
 2015-03-12 17:38:04,317 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to
 0.0078125
 2015-03-12 17:38:04,461 ERROR breeze.optimize.StrongWolfeLineSearch |
 Encountered bad values in function evaluation. Decreasing step size to
 0.005859375
 2015-03-12 17:38:04,605 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:04,672 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:04,747 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:04,818 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:04,890 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:04,962 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:05,038 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:05,107 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:05,186 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:05,256 INFO breeze.optimize.StrongWolfeLineSearch | Line
 search t: NaN fval: NaN rhs: NaN cdd: NaN
 2015-03-12 17:38:05,257 ERROR breeze.optimize.LBFGS | Failure! Resetting
 history: breeze.optimize.FirstOrderException: Line search zoom failed
 
 
 What causes them and how do I fix them?  
 
 I checked my data and there seemed nothing out of the ordinary.  The
 resulting prediction model seemed acceptable to me.  So, are these ERRORs
 actually WARNINGs?  Could we or should we tune the level of these messages
 down one notch? 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/LogisticRegressionWithLBFGS-shows-ERRORs-tp22042.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Process time series RDD after sortByKey

2015-03-09 Thread Zhan Zhang
Does the code flow similar to following work for you, which processes each 
partition of an RDD sequentially?

while( iterPartition  RDD.partitions.length) {
  val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, 
allowLocal = true)
  Some other function after processing one partition.
  iterPartition += 1
}

You can refer RDD.take for example.

Thanks.

Zhan Zhang

On Mar 9, 2015, at 3:41 PM, Shuai Zheng 
szheng.c...@gmail.commailto:szheng.c...@gmail.com wrote:

Hi All,

I am processing some time series data. For one day, it might has 500GB, then 
for each hour, it is around 20GB data.

I need to sort the data before I start process. Assume I can sort them 
successfully

dayRDD.sortByKey

but after that, I might have thousands of partitions (to make the sort 
successfully), might be 1000 partitions. And then I try to process the data by 
hour (not need exactly one hour, but some kind of similar time frame). And I 
can’t just re-partition size to 24 because then one partition might be too big 
to fit into memory (if it is 20GB). So is there any way for me to just can 
process underlying partitions by certain order? Basically I want to call 
mapPartitionsWithIndex with a range of index?

Anyway to do it? Hope I describe my issue clear… :)

Regards,

Shuai



Re: [SPARK-SQL] How to pass parameter when running hql script using cli?

2015-03-06 Thread Zhan Zhang
Do you mean “--hiveConf” (two dash) , instead of -hiveconf (one dash)

Thanks.

Zhan Zhang

On Mar 6, 2015, at 4:20 AM, James alcaid1...@gmail.com wrote:

 Hello, 
 
 I want to execute a hql script through `spark-sql` command, my script 
 contains: 
 
 ```
 ALTER TABLE xxx 
 DROP PARTITION (date_key = ${hiveconf:CUR_DATE});
 ```
 
 when I execute 
 
 ```
 spark-sql -f script.hql -hiveconf CUR_DATE=20150119
 ```
 
 It throws an error like 
 ```
 cannot recognize input near '$' '{' 'hiveconf' in constant
 ```
 
 I have try on hive and it works. Thus how could I pass a parameter like date 
 to a hql script?
 
 Alcaid


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-06 Thread Zhan Zhang
Hi Todd,

Looks like the thrift server can connect to metastore, but something wrong in 
the executors. You can try to get the log with yarn logs -applicationID xxx” 
to check why it failed. If there is no log (master or executor is not started 
at all), you can go to the RM webpage, click the link to see why the shell 
failed in the first place.

Thanks.

Zhan Zhang

On Mar 6, 2015, at 9:59 AM, Todd Nist 
tsind...@gmail.commailto:tsind...@gmail.com wrote:

First, thanks to everyone for their assistance and recommendations.

@Marcelo

I applied the patch that you recommended and am now able to get into the shell, 
thank you worked great after I realized that the pom was pointing to the 
1.3.0-SNAPSHOT for parent, need to bump that down to 1.2.1.

@Zhan

Need to apply this patch next.  I tried to start the spark-thriftserver but and 
it starts, then fails with like this:  I have the entries in my 
spark-default.conf, but not the patch applied.


./sbin/start-thriftserver.sh --master yarn --executor-memory 1024m --hiveconf 
hive.server2.thrift.port=10001

5/03/06 12:34:17 INFO ui.SparkUI: Started SparkUI at 
http://hadoopdev01http://hadoopdev01/.opsdatastore.com:4040
15/03/06 12:34:18 INFO impl.TimelineClientImpl: Timeline service address: 
http://hadoopdev02http://hadoopdev02/.opsdatastore.com:8188/ws/v1/timeline/
15/03/06 12:34:18 INFO client.RMProxy: Connecting to ResourceManager at 
hadoopdev02.opsdatastore.com/192.168.15.154:8050
15/03/06 12:34:18 INFO yarn.Client: Requesting a new application from cluster 
with 4 NodeManagers
15/03/06 12:34:18 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (8192 MB per container)
15/03/06 12:34:18 INFO yarn.Client: Will allocate AM container, with 896 MB 
memory including 384 MB overhead
15/03/06 12:34:18 INFO yarn.Client: Setting up container launch context for our 
AM
15/03/06 12:34:18 INFO yarn.Client: Preparing resources for our AM container
15/03/06 12:34:19 WARN shortcircuit.DomainSocketFactory: The short-circuit 
local reads feature cannot be used because libhadoop cannot be loaded.
15/03/06 12:34:19 INFO yarn.Client: Uploading resource 
file:/root/spark-1.2.1-bin-hadoop2.6/lib/spark-assembly-1.2.1-hadoop2.6.0.jar 
- 
hdfs://hadoopdev01.opsdatastore.com:8020/user/root/.sparkStaging/application_1425078697953_0018/spark-assembly-1.2.1-hadoop2.6.0.jar
15/03/06 12:34:21 INFO yarn.Client: Setting up the launch environment for our 
AM container
15/03/06 12:34:21 INFO spark.SecurityManager: Changing view acls to: root
15/03/06 12:34:21 INFO spark.SecurityManager: Changing modify acls to: root
15/03/06 12:34:21 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/03/06 12:34:21 INFO yarn.Client: Submitting application 18 to ResourceManager
15/03/06 12:34:21 INFO impl.YarnClientImpl: Submitted application 
application_1425078697953_0018
15/03/06 12:34:22 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: ACCEPTED)
15/03/06 12:34:22 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1425663261755
 final status: UNDEFINED
 tracking URL: 
http://hadoopdev02http://hadoopdev02/.opsdatastore.com:8088/proxy/application_1425078697953_0018/
 user: root
15/03/06 12:34:23 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: ACCEPTED)
15/03/06 12:34:24 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: ACCEPTED)
15/03/06 12:34:25 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: ACCEPTED)
15/03/06 12:34:26 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: ACCEPTED)
15/03/06 12:34:27 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster 
registered as 
Actor[akka.tcp://sparkyar...@hadoopdev08.opsdatastore.com:40201/user/YarnAM#-557112763]
15/03/06 12:34:27 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS - 
hadoopdev02.opsdatastore.com, PROXY_URI_BASES - 
http://hadoopdev02http://hadoopdev02/.opsdatastore.com:8088/proxy/application_1425078697953_0018),
 /proxy/application_1425078697953_0018
15/03/06 12:34:27 INFO ui.JettyUtils: Adding filter: 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/03/06 12:34:27 INFO yarn.Client: Application report for 
application_1425078697953_0018 (state: RUNNING)
15/03/06 12:34:27 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: hadoopdev08.opsdatastore.com
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1425663261755
 final status: UNDEFINED
 tracking URL: 
http://hadoopdev02http://hadoopdev02/.opsdatastore.com

Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-06 Thread Zhan Zhang
Sorry. Misunderstanding. Looks like it already worked. If you still met some 
hdp.version problem, you can try it :)

Thanks.

Zhan Zhang

On Mar 6, 2015, at 11:40 AM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:

You are using 1.2.1 right? If so, please add java-opts  in conf directory and 
give it a try.

[root@c6401 conf]# more java-opts
  -Dhdp.version=2.2.2.0-2041

Thanks.

Zhan Zhang

On Mar 6, 2015, at 11:35 AM, Todd Nist 
tsind...@gmail.commailto:tsind...@gmail.com wrote:

 -Dhdp.version=2.2.0.0-2041




Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-06 Thread Zhan Zhang
You are using 1.2.1 right? If so, please add java-opts  in conf directory and 
give it a try.

[root@c6401 conf]# more java-opts
  -Dhdp.version=2.2.2.0-2041

Thanks.

Zhan Zhang

On Mar 6, 2015, at 11:35 AM, Todd Nist 
tsind...@gmail.commailto:tsind...@gmail.com wrote:

 -Dhdp.version=2.2.0.0-2041



Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Zhan Zhang
In addition, you may need following patch if it is not in 1.2.1 to solve some 
system property issue if you use HDP 2.2.

https://github.com/apache/spark/pull/3409

You can follow the following link to set hdp.version for java options.

http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

Thanks.

Zhan Zhang

On Mar 5, 2015, at 11:09 AM, Marcelo Vanzin 
van...@cloudera.commailto:van...@cloudera.com wrote:

It seems from the excerpt below that your cluster is set up to use the
Yarn ATS, and the code is failing in that path. I think you'll need to
apply the following patch to your Spark sources if you want this to
work:

https://github.com/apache/spark/pull/3938

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist tsind...@gmail.com wrote:
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:166)
   at
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
   at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:65)
   at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
   at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
   at org.apache.spark.SparkContext.init(SparkContext.scala:348)

--
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD coalesce or repartition by #records or #bytes?

2015-03-04 Thread Zhan Zhang
It use HashPartitioner to distribute the record to different partitions, but 
the key is just integer  evenly across output partitions.

From the code, each resulting partition will get very similar number of 
records.

Thanks.

Zhan Zhang


On Mar 4, 2015, at 3:47 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

My RDD's are created from kafka stream. After receiving a RDD, I want to do 
coalesce/repartition it so that the data will be processed in a set of machines 
in parallel as even as possible. The number of processing nodes is larger than 
the receiving nodes.

My question is how the coalesce/repartition works. Does it distribute by the 
number of records or number of bytes? In my app, my observation is that the 
distribution seems by number of records. The consequence is, however, some 
executors have to process x1000 as much as data when the sizes of records are 
very skewed. Then we have to allocate memory by the worst case.

Is there a way to programmatically affect the coalesce /repartition scheme?

Thanks,
Du



Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Zhan Zhang
Which spark version did you use? I tried spark-1.2.1 and didn’t meet this 
problem.

scala val m = hiveContext.sql( select * from  testtable where value like 
'%Restaurant%')
15/03/05 02:02:30 INFO ParseDriver: Parsing command: select * from  testtable 
where value like '%Restaurant%'
15/03/05 02:02:30 INFO ParseDriver: Parse Completed
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(462299) called with 
curMem=1087888, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 451.5 KB, free 265.8 MB)
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(81645) called with 
curMem=1550187, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 79.7 KB, free 265.7 MB)
15/03/05 02:02:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
c6402.ambari.apache.orghttp://c6402.ambari.apache.org:33696 (size: 79.7 KB, 
free: 267.0 MB)
15/03/05 02:02:30 INFO BlockManagerMaster: Updated info of block 
broadcast_2_piece0
15/03/05 02:02:30 INFO DefaultExecutionContext: Created broadcast 2 from 
broadcast at TableReader.scala:68
m: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Filter Contains(value#5, Restaurant)
 HiveTableScan [key#4,value#5], (MetastoreRelation default, testtable, None), 
None

scala


Thanks.

Zhan Zhang

On Mar 4, 2015, at 9:09 AM, Anusha Shamanur 
anushas...@gmail.commailto:anushas...@gmail.com wrote:

I tried. I still get the same error.

15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from 
TableName where value like '%Restaurant%'

15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed.

15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default 
tbl=TableName

15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr 
cmd=get_table : db=default tbl=TableName
results: org.apache.spark.sql.SchemaRDD =

SchemaRDD[86] at RDD at SchemaRDD.scala:108
== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

'Project [*]

'Filter ('value LIKE Restaurant)
  MetastoreRelation default, TableName, None



On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda 
ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com wrote:
Why don't you formulate a string before you pass it to the hql function 
(appending strings), and hql function is deprecated. You should use sql.

http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur 
anushas...@gmail.commailto:anushas...@gmail.com wrote:
Hi,

I am trying to run a simple select query on a table.

val restaurants=hiveCtx.hql(select * from TableName where column like 
'%SomeString%' )
This gives an error as below:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

How do I solve this?


--
Regards,
Anusha



--

[Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/



--
Regards,
Anusha



Re: Issue with yarn cluster - hangs in accepted state.

2015-03-03 Thread Zhan Zhang
Do you have enough resource in your cluster? You can check your resource 
manager to see the usage.

Thanks.

Zhan Zhang

On Mar 3, 2015, at 8:51 AM, abhi 
abhishek...@gmail.commailto:abhishek...@gmail.com wrote:



I am trying to run below java class with yarn cluster, but it hangs in accepted 
state . i don't see any error . Below is the class and command . Any help is 
appreciated .


Thanks,

Abhi





bin/spark-submit --class com.mycompany.app.SimpleApp --master yarn-cluster 
/home/hduser/my-app-1.0.jar


{code}

public class SimpleApp {

public static void main(String[] args) {

  String logFile = /home/hduser/testspark.txt; // Should be some file on 
your system

  SparkConf conf = new SparkConf().setAppName(Simple Application);

  JavaSparkContext sc = new JavaSparkContext(conf);

  JavaRDDString logData = sc.textFile(logFile).cache();


  long numAs = logData.filter(new FunctionString, Boolean() {

public Boolean call(String s) { return s.contains(a); }

  }).count();


  long numBs = logData.filter(new FunctionString, Boolean() {

public Boolean call(String s) { return s.contains(b); }

  }).count();


  System.out.println(Lines with a:  + numAs + , lines with b:  + numBs);

}


  }

{code}


15/03/03 11:47:40 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:41 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:42 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:43 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:44 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:45 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:46 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:47 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:48 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:49 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:50 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:51 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:52 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:53 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:54 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:55 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:56 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:57 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:58 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:47:59 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:48:00 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:48:01 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:48:02 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:48:03 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED)

15/03/03 11:48:04 INFO yarn.Client: Application report for 
application_1425398386987_0002 (state: ACCEPTED



Re: Resource manager UI for Spark applications

2015-03-03 Thread Zhan Zhang
In Yarn (Cluster or client), you can access the spark ui when the app is 
running. After app is done, you can still access it, but need some extra setup 
for history server.

Thanks.

Zhan Zhang

On Mar 3, 2015, at 10:08 AM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:

bq. changing the address with internal to the external one , but still does not 
work.
Not sure what happened.
For the time being, you can use yarn command line to pull container log (put in 
your appId and container Id):
yarn logs -applicationId application_1386639398517_0007 -containerId 
container_1386639398517_0007_01_19

Cheers

On Tue, Mar 3, 2015 at 9:50 AM, roni 
roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote:
Hi Ted,
 I  used s3://support.elasticmapreduce/spark/install-spark to install spark on 
my EMR cluster. It is 1.2.0.
 When I click on the link for history or logs it takes me to
http://ip-172-31-43-116.us-west-2.compute.internal:9035/node/containerlogs/container_1424105590052_0070_01_01/hadoop
 and I get -

The server at 
ip-172-31-43-116.ushttp://ip-172-31-43-116.us-west-2.compute.internal can't 
be found, because the DNS lookup failed. DNS is the network service that 
translates a website's name to its Internet address. This error is most often 
caused by having no connection to the Internet or a misconfigured network. It 
can also be caused by an unresponsive DNS server or a firewall preventing 
Google Chrome from accessing the network.
I tried  changing the address with internal to the external one , but still 
does not work.
Thanks
_roni


On Tue, Mar 3, 2015 at 9:05 AM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
bq. spark UI does not work for Yarn-cluster.

Can you be a bit more specific on the error(s) you saw ?

What Spark release are you using ?

Cheers

On Tue, Mar 3, 2015 at 8:53 AM, Rohini joshi 
roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote:
Sorry , for half email - here it is again in full
Hi ,
I have 2 questions -

 1. I was trying to use Resource Manager UI for my SPARK application using yarn 
cluster mode as I observed that spark UI does not work for Yarn-cluster.
IS that correct or am I missing some setup?

2. when I click on Application Monitoring or history , i get re-directed to 
some linked with internal Ip address. Even if I replace that address with the 
public IP , it still does not work.  What kind of setup changes are needed for 
that?

Thanks
-roni

On Tue, Mar 3, 2015 at 8:45 AM, Rohini joshi 
roni.epi...@gmail.commailto:roni.epi...@gmail.com wrote:
Hi ,
I have 2 questions -

 1. I was trying to use Resource Manager UI for my SPARK application using yarn 
cluster mode as I observed that spark UI does not work for Yarn-cluster.
IS that correct or am I missing some setup?











Re: How to tell if one RDD depends on another

2015-02-26 Thread Zhan Zhang
You don’t need to know rdd dependencies to maximize dependencies. Internally 
the scheduler will construct the DAG and trigger the execution if there is no 
shuffle dependencies in between RDDs.

Thanks.

Zhan Zhang 
On Feb 26, 2015, at 1:28 PM, Corey Nolet cjno...@gmail.com wrote:

 Let's say I'm given 2 RDDs and told to store them in a sequence file and they 
 have the following dependency:
 
 val rdd1 = sparkContext.sequenceFile().cache()
 val rdd2 = rdd1.map()
 
 
 How would I tell programmatically without being the one who built rdd1 and 
 rdd2 whether or not rdd2 depends on rdd1?
 
 I'm working on a concurrency model for my application and I won't necessarily 
 know how the two rdds are constructed. What I will know is whether or not 
 rdd1 is cached but i want to maximum concurrency and run rdd1 and rdd2 
 together if rdd2 does not depend on rdd1.
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to tell if one RDD depends on another

2015-02-26 Thread Zhan Zhang
What confused me is  the statement of The final result is that rdd1 is 
calculated twice.” Is it the expected behavior?

Thanks.

Zhan Zhang

On Feb 26, 2015, at 3:03 PM, Sean Owen 
so...@cloudera.commailto:so...@cloudera.com wrote:

To distill this a bit further, I don't think you actually want rdd2 to
wait on rdd1 in this case. What you want is for a request for
partition X to wait if partition X is already being calculated in a
persisted RDD. Otherwise the first partition of rdd2 waits on the
final partition of rdd1 even when the rest is ready.

That is probably usually a good idea in almost all cases. That much, I
don't know how hard it is to implement. But I speculate that it's
easier to deal with it at that level than as a function of the
dependency graph.

On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
I'm trying to do the scheduling myself now- to determine that rdd2 depends
on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can
do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure
this out so I don't need to think about all this.



Re: How to tell if one RDD depends on another

2015-02-26 Thread Zhan Zhang
In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to finish 
probably due to writing to hdfs.  a walk around for this particular case may be 
as follows.

val rdd1 = ..cache()

val rdd2 = rdd1.map().()
rdd1.count
future { rdd1.saveAsHasoopFile(...) }
future { rdd2.saveAsHadoopFile(…)]

In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen 
concurrently.

Thanks.

Zhan Zhang



On Feb 26, 2015, at 3:28 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:

 What confused me is  the statement of The final result is that rdd1 is 
 calculated twice.” Is it the expected behavior?

To be perfectly honest, performing an action on a cached RDD in two different 
threads and having them (at the partition level) block until the parent are 
cached would be the behavior and myself and all my coworkers expected.

On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
I should probably mention that my example case is much over simplified- Let's 
say I've got a tree, a fairly complex one where I begin a series of jobs at the 
root which calculates a bunch of really really complex joins and as I move down 
the tree, I'm creating reports from the data that's already been joined (i've 
implemented logic to determine when cached items can be cleaned up, e.g. the 
last report has been done in a subtree).

My issue is that the 'actions' on the rdds are currently being implemented in a 
single thread- even if I'm waiting on a cache to complete fully before I run 
the children jobs, I'm still in a better placed than I was because I'm able 
to run those jobs concurrently- right now this is not the case.

 What you want is for a request for partition X to wait if partition X is 
 already being calculated in a persisted RDD.

I totally agree and if I could get it so that it's waiting at the granularity 
of the partition, I'd be in a much much better place. I feel like I'm going 
down a rabbit hole and working against the Spark API.


On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen 
so...@cloudera.commailto:so...@cloudera.com wrote:
To distill this a bit further, I don't think you actually want rdd2 to
wait on rdd1 in this case. What you want is for a request for
partition X to wait if partition X is already being calculated in a
persisted RDD. Otherwise the first partition of rdd2 waits on the
final partition of rdd1 even when the rest is ready.

That is probably usually a good idea in almost all cases. That much, I
don't know how hard it is to implement. But I speculate that it's
easier to deal with it at that level than as a function of the
dependency graph.

On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
 I'm trying to do the scheduling myself now- to determine that rdd2 depends
 on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can
 do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure
 this out so I don't need to think about all this.





Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Zhan Zhang
Here is my understanding.

When running on top of yarn, the cores means the number of tasks can run in one 
executor. But all these cores are located in the same JVM.

Parallelism typically control the balance of tasks. For example, if you have 
200 cores, but only 50 partitions. There will be 150 cores sitting idle.

OOM: increase the memory size, and JVM memory overhead may help here.

Thanks.

Zhan Zhang

On Feb 26, 2015, at 2:03 PM, Yana Kadiyska 
yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote:

Imran, I have also observed the phenomenon of reducing the cores helping with 
OOM. I wanted to ask this (hopefully without straying off topic): we can 
specify the number of cores and the executor memory. But we don't get to 
specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core per 
executor thus ending up with 24G for the task, but with 24G memory and 10 cores 
some executor ends up with 3 cores on the same machine and thus we have only 8G 
per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid 
iras...@cloudera.commailto:iras...@cloudera.com wrote:
Hi Yong,

mostly correct except for:

  *   Since we are doing reduceByKey, shuffling will happen. Data will be 
shuffled into 1000 partitions, as we have 1000 unique keys.

no, you will not get 1000 partitions.  Spark has to decide how many partitions 
to use before it even knows how many unique keys there are.  If you have 200 as 
the default parallelism (or you just explicitly make it the second parameter to 
reduceByKey()), then you will get 200 partitions.  The 1000 unique keys will be 
distributed across the 200 partitions.  ideally they will be distributed pretty 
equally, but how they get distributed depends on the partitioner (by default 
you will have a HashPartitioner, so it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark where 
there is some overhead proportional to the size of a partition.  So in your 
example, if you have 1000 unique keys in 200 partitions, you expect about 5 
unique keys per partitions -- if instead you had 10 partitions, you'd expect 
100 unique keys per partitions, and thus more data and you'd be more likely to 
hit an OOM.  But there are many other possible sources of OOM, so this is 
definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody more 
knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964 
java8...@hotmail.commailto:java8...@hotmail.com wrote:
Hi, Sparkers:

I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.

I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or spark.default.parallelism shouldn't have any impact.

For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:


  *   We have 100 partitions, as the data comes from 100 blocks. Most likely 
the spark will generate 100 tasks to read and shuffle them?
  *   The 1000 unique keys mean the 1000 reducer group, like in MR
  *   If I set the max core to be 50, so there will be up to 50 tasks can be 
run concurrently. The rest tasks just have to wait for the core, if there are 
50 tasks are running.
  *   Since we are doing reduceByKey, shuffling will happen. Data will be 
shuffled into 1000 partitions, as we have 1000 unique keys.
  *   I don't know these 1000 partitions will be processed by how many tasks, 
maybe this is the parallelism parameter comes in?
  *   No matter what parallelism this will be, there are ONLY 50 task can be 
run concurrently. So if we set more cores, more partitions' data will be 
processed in the executor (which runs more thread in this case), so more memory 
needs. I don't see how increasing parallelism could help the OOM in this case.
  *   In my test case of Spark SQL, I gave 24G as the executor heap, my join 
between 2 big datasets keeps getting OOM. I keep increasing the 
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no help. 
What really makes the query finish finally without OOM is after I change the 
--total-executor-cores from 10 to 4

Re: Running spark function on parquet without sql

2015-02-26 Thread Zhan Zhang
When you use sql (or API from SchemaRDD/DataFrame) to read data form parquet, 
the optimizer will do column pruning, predictor pushdown, etc. Thus you can  
the benefit of parquet column benefits. After that, you can operate the 
SchemaRDD (DF) like regular RDD.

Thanks.

Zhan Zhang
 
On Feb 26, 2015, at 1:50 PM, tridib tridib.sama...@live.com wrote:

 Hello Experts,
 In one of my projects we are having parquet files and we are using spark SQL
 to get our analytics. I am encountering situation where simple SQL is not
 getting me what I need or the complex SQL is not supported by Spark Sql. In
 scenarios like this I am able to get things done using low level spark
 constructs like MapFunction and reducers.
 
 My question is if I create a JavaSchemaRdd on Parquet and use basic spark
 constructs, will I still get the benefit of parquets columnar format? Will
 my aggregation be as fast as it would have been if I have used SQL?
 
 Please advice.
 
 Thanks  Regards
 Tridib
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-function-on-parquet-without-sql-tp21833.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to tell if one RDD depends on another

2015-02-26 Thread Zhan Zhang
Currently in spark, it looks like there is no easy way to know the 
dependencies. It is solved at run time.

Thanks.

Zhan Zhang
On Feb 26, 2015, at 4:20 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:


Ted. That one I know. It was the dependency part I was curious about

On Feb 26, 2015 7:12 PM, Ted Yu 
yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote:
bq. whether or not rdd1 is a cached rdd

RDD has getStorageLevel method which would return the RDD's current storage 
level.

SparkContext has this method:
   * Return information about what RDDs are cached, if they are in mem or on 
disk, how much space
   * they take, etc.
   */
  @DeveloperApi
  def getRDDStorageInfo: Array[RDDInfo] = {

Cheers

On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
Zhan,

This is exactly what I'm trying to do except, as I metnioned in my first 
message, I am being given rdd1 and rdd2 only and I don't necessarily know at 
that point whether or not rdd1 is a cached rdd. Further, I don't know at that 
point whether or not rdd2 depends on rdd1.

On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:
In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to finish 
probably due to writing to hdfs.  a walk around for this particular case may be 
as follows.

val rdd1 = ..cache()

val rdd2 = rdd1.map().()
rdd1.count
future { rdd1.saveAsHasoopFile(...) }
future { rdd2.saveAsHadoopFile(…)]

In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen 
concurrently.

Thanks.

Zhan Zhang



On Feb 26, 2015, at 3:28 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:

 What confused me is  the statement of The final result is that rdd1 is 
 calculated twice.” Is it the expected behavior?

To be perfectly honest, performing an action on a cached RDD in two different 
threads and having them (at the partition level) block until the parent are 
cached would be the behavior and myself and all my coworkers expected.

On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
I should probably mention that my example case is much over simplified- Let's 
say I've got a tree, a fairly complex one where I begin a series of jobs at the 
root which calculates a bunch of really really complex joins and as I move down 
the tree, I'm creating reports from the data that's already been joined (i've 
implemented logic to determine when cached items can be cleaned up, e.g. the 
last report has been done in a subtree).

My issue is that the 'actions' on the rdds are currently being implemented in a 
single thread- even if I'm waiting on a cache to complete fully before I run 
the children jobs, I'm still in a better placed than I was because I'm able 
to run those jobs concurrently- right now this is not the case.

 What you want is for a request for partition X to wait if partition X is 
 already being calculated in a persisted RDD.

I totally agree and if I could get it so that it's waiting at the granularity 
of the partition, I'd be in a much much better place. I feel like I'm going 
down a rabbit hole and working against the Spark API.


On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen 
so...@cloudera.commailto:so...@cloudera.com wrote:
To distill this a bit further, I don't think you actually want rdd2 to
wait on rdd1 in this case. What you want is for a request for
partition X to wait if partition X is already being calculated in a
persisted RDD. Otherwise the first partition of rdd2 waits on the
final partition of rdd1 even when the rest is ready.

That is probably usually a good idea in almost all cases. That much, I
don't know how hard it is to implement. But I speculate that it's
easier to deal with it at that level than as a function of the
dependency graph.

On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet 
cjno...@gmail.commailto:cjno...@gmail.com wrote:
 I'm trying to do the scheduling myself now- to determine that rdd2 depends
 on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can
 do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure
 this out so I don't need to think about all this.








Re: NullPointerException in ApplicationMaster

2015-02-25 Thread Zhan Zhang
Look at the trace again. It is a very weird error. The SparkSubmit is running 
on client side, but YarnClusterSchedulerBackend is supposed in running in YARN 
AM.

I suspect you are running the cluster with yarn-client mode, but in 
JavaSparkContext you set yarn-cluster”. As a result, spark context initiate 
YarnClusterSchedulerBackend instead of YarnClientSchedulerBackend,  which I 
think is the root cause.

Thanks.

Zhan Zhang

On Feb 25, 2015, at 1:53 PM, Zhan Zhang 
zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote:

Hi Mate,

When you initialize the JavaSparkContext, you don’t need to specify the mode 
“yarn-cluster”. I suspect that is the root cause.

Thanks.

Zhan Zhang

On Feb 25, 2015, at 10:12 AM, gulyasm 
mgulya...@gmail.commailto:mgulya...@gmail.com wrote:

JavaSparkContext.




Re: Can't access remote Hive table from spark

2015-02-12 Thread Zhan Zhang
When you log in, you have root access. Then you can do “su hdfs” or any other 
account. Then you can create hdfs directory and change permission, etc.


Thanks

Zhan Zhang

On Feb 11, 2015, at 11:28 PM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan,

Yes, I found there is a hdfs account, which is created by Ambari, but what's 
the password for this account, how can I login under this account?
Can I just change the password for the hdfs account?

Regards,



-- Original --
From:  Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com;
Send time: Thursday, Feb 12, 2015 2:00 AM
To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com;
Cc: 
user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org;
 Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com;
Subject:  Re: Can't access remote Hive table from spark

You need to have right hdfs account, e.g., hdfs,  to create directory and 
assign permission.

Thanks.

Zhan Zhang
On Feb 11, 2015, at 4:34 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan,
My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to 
create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu 
and root

[xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

mkdir: Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.


mkdir: Permission denied: user=root, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

I notice there is a hdfs account created by ambari, but what's password for it, 
should I user the hdfs account to create the directory?



-- Original --
From:  Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com;
Send time: Sunday, Feb 8, 2015 4:11 AM
To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com;
Cc: 
user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org;
 Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com;
Subject:  Re: Can't access remote Hive table from spark

Yes. You need to create xiaobogu under /user and provide right permission to 
xiaobogu.

Thanks.

Zhan Zhang

On Feb 7, 2015, at 8:15 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan Zhang,

With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by 
ambari 1.7.0, I come with the following errors:

[xiaobogu@lix1 spark]$ ./bin/spark-submit --class 
org.apache.spark.examples.SparkPi--master yarn-cluster  --num-executors 3 
--driver-memory 512m  --executor-memory 512m   --executor-cores 1  
lib/spark-examples*.jar 10


Spark assembly has been built with Hive, including Datanucleus jars on classpath

15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at 
lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050

15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers

15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (4096 MB per container)

15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB 
memory including 384 MB overhead

15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our 
AM

15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container

15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.

Exception in thread main org.apache.hadoop.security.AccessControlException: 
Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251

Re: Can't access remote Hive table from spark

2015-02-11 Thread Zhan Zhang
You need to have right hdfs account, e.g., hdfs,  to create directory and 
assign permission.

Thanks.

Zhan Zhang
On Feb 11, 2015, at 4:34 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan,
My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to 
create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu 
and root

[xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

mkdir: Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.


mkdir: Permission denied: user=root, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

I notice there is a hdfs account created by ambari, but what's password for it, 
should I user the hdfs account to create the directory?



-- Original --
From:  Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com;
Send time: Sunday, Feb 8, 2015 4:11 AM
To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com;
Cc: 
user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org;
 Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com;
Subject:  Re: Can't access remote Hive table from spark

Yes. You need to create xiaobogu under /user and provide right permission to 
xiaobogu.

Thanks.

Zhan Zhang

On Feb 7, 2015, at 8:15 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan Zhang,

With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by 
ambari 1.7.0, I come with the following errors:

[xiaobogu@lix1 spark]$ ./bin/spark-submit --class 
org.apache.spark.examples.SparkPi--master yarn-cluster  --num-executors 3 
--driver-memory 512m  --executor-memory 512m   --executor-cores 1  
lib/spark-examples*.jar 10


Spark assembly has been built with Hive, including Datanucleus jars on classpath

15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at 
lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050

15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers

15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (4096 MB per container)

15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB 
memory including 384 MB overhead

15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our 
AM

15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container

15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.

Exception in thread main org.apache.hadoop.security.AccessControlException: 
Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194)

at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813)

at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600)

at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

at java.security.AccessController.doPrivileged(Native Method

Re: Can't access remote Hive table from spark

2015-02-07 Thread Zhan Zhang
Yes. You need to create xiaobogu under /user and provide right permission to 
xiaobogu.

Thanks.

Zhan Zhang

On Feb 7, 2015, at 8:15 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan Zhang,

With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by 
ambari 1.7.0, I come with the following errors:

[xiaobogu@lix1 spark]$ ./bin/spark-submit --class 
org.apache.spark.examples.SparkPi--master yarn-cluster  --num-executors 3 
--driver-memory 512m  --executor-memory 512m   --executor-cores 1  
lib/spark-examples*.jar 10


Spark assembly has been built with Hive, including Datanucleus jars on classpath

15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at 
lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050

15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers

15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (4096 MB per container)

15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB 
memory including 384 MB overhead

15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our 
AM

15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container

15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.

Exception in thread main org.apache.hadoop.security.AccessControlException: 
Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4221)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4194)

at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:813)

at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:600)

at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)


at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)

at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)

at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2555)

at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2524)

at 
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827)

at 
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:823)

at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:823)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:816)

at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1815)

at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:595

Re: Can't access remote Hive table from spark

2015-02-05 Thread Zhan Zhang
Not sure spark standalone mode. But on spark-on-yarn, it should work. You can 
check following link:

 http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

Thanks.

Zhan Zhang

On Feb 5, 2015, at 5:02 PM, Cheng Lian 
lian.cs@gmail.commailto:lian.cs@gmail.com wrote:


Please note that Spark 1.2.0 only support Hive 0.13.1 or 0.12.0, none of other 
versions are supported.

Best,
Cheng

On 1/25/15 12:18 AM, guxiaobo1982 wrote:


Hi,
I built and started a single node standalone Spark 1.2.0 cluster along with a 
single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and 
Hive node I can create and query tables inside Hive, and on remote machines I 
can submit the SparkPi example to the Spark master. But I failed to run the 
following example code :


public class SparkTest {

public static void main(String[] args)

{

String appName= This is a test application;

String master=spark://lix1.bh.com:7077;


SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

JavaSparkContext sc = new JavaSparkContext(conf);


JavaHiveContext sqlCtx = new 
org.apache.spark.sql.hive.api.java.JavaHiveContext(sc);

//sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING));

//sqlCtx.sql(LOAD DATA LOCAL INPATH 
'/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src);

// Queries are expressed in HiveQL.

ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect();

System.out.print(I got  + rows.size() +  rows \r\n);

sc.close();}

}


Exception in thread main 
org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src

at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980)

at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)

at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)

at 
org.apache.spark.sql.hive.HiveContext$anon$2.orghttp://2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$super$lookupRelation(HiveContext.scala:253)

at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141)

at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$anonfun$lookupRelation$3.apply(Catalog.scala:141)

at scala.Option.getOrElse(Option.scala:120)

at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)

at 
org.apache.spark.sql.hive.HiveContext$anon$2.lookupRelation(HiveContext.scala:253)

at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:143)

at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$anonfun$apply$5.applyOrElse(Analyzer.scala:138)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:162)

at scala.collection.Iterator$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

at scala.collection.AbstractIterator.to(Iterator.scala:1157)

at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)

at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)

at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)

at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)

at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)

at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)

at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)

at scala.collection.immutable.List.foldLeft(List.scala:84)

at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)

at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)

at scala.collection.immutable.List.foreach(List.scala:318)

at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)

at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411

Re: Spark impersonation

2015-02-02 Thread Zhan Zhang
I think you can configure hadoop/hive to do impersonation.  There is no 
difference between secure or insecure hadoop cluster by using kinit.

Thanks.

Zhan Zhang

On Feb 2, 2015, at 9:32 PM, Koert Kuipers 
ko...@tresata.commailto:ko...@tresata.com wrote:

yes jobs run as the user that launched them.
if you want to run jobs on a secure cluster then use yarn. hadoop standalone 
does not support secure hadoop.

On Mon, Feb 2, 2015 at 5:37 PM, Jim Green 
openkbi...@gmail.commailto:openkbi...@gmail.com wrote:
Hi Team,

Does spark support impersonation?
For example, when spark on yarn/hive/hbase/etc..., which user is used by 
default?
The user which starts the spark job?
Any suggestions related to impersonation?

--
Thanks,
www.openkb.infohttp://www.openkb.info/
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




Re: Error when get data from hive table. Use python code.

2015-01-29 Thread Zhan Zhang
You are running yarn-client mode. How about increase the --driver-memory and 
give it a try?

Thanks.

Zhan Zhang

On Jan 29, 2015, at 6:36 PM, QiuxuanZhu 
ilsh1...@gmail.commailto:ilsh1...@gmail.com wrote:

Dear all,

I have no idea when it raises an error when I run the following code.

def getRow(data):
return data.msg

first_sql = select * from logs.event where dt = '20150120' and et = 'ppc' 
LIMIT 10#error
#first_sql = select * from hivecrawler.vip_crawler where src='xx' and dt=' + 
timestamp + '#correct
sc = SparkContext(appName=parse)
sqlContext = HiveContext(sc)
data = sqlContext.sql(first_sql)
file_target = /tmp/test/logdd
data.map(getRow).saveAsTextFile(file_target)
sc.stop()
print 'stop'

I submit the code by following script:


/usr/local/spark-default/bin/spark-submit --master yarn-client 
--executor-memory 8G --num-executors 20 --executor-cores 2 --py-files a.py

It would raise a error.

The Spark Log shows that

15/01/30 09:46:39 ERROR metastore.RetryingHMSHandler: 
java.lang.OutOfMemoryError: GC overhead limit exceeded

and the python code shows that:

py4j.protocol.Py4JJavaError: An error occurred while calling o26.javaToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
com.mysql.jdbc.SingleByteCharsetConverter.toString(SingleByteCharsetConverter.java:333)
at com.mysql.jdbc.ResultSetRow.getString(ResultSetRow.java:819)
at com.mysql.jdbc.ByteArrayRow.getString(ByteArrayRow.java:70)
at 
com.mysql.jdbc.ResultSetImpl.getStringInternal(ResultSetImpl.java:5811)
at com.mysql.jdbc.ResultSetImpl.getString(ResultSetImpl.java:5688)
at com.mysql.jdbc.ResultSetImpl.getObject(ResultSetImpl.java:4985)
at 
org.datanucleus.store.rdbms.datasource.dbcp.DelegatingResultSet.getObject(DelegatingResultSet.java:325)
at 
org.datanucleus.store.rdbms.datasource.dbcp.DelegatingResultSet.getObject(DelegatingResultSet.java:325)
at 
org.datanucleus.store.rdbms.query.ResultClassROF.getResultObject(ResultClassROF.java:666)
at 
org.datanucleus.store.rdbms.query.ResultClassROF.getObject(ResultClassROF.java:309)
at 
org.datanucleus.store.rdbms.query.ForwardQueryResult.nextResultSetElement(ForwardQueryResult.java:181)
at 
org.datanucleus.store.rdbms.query.ForwardQueryResult$QueryResultIterator.next(ForwardQueryResult.java:403)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.loopJoinOrderedResult(MetaStoreDirectSql.java:665)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getPartitionsViaSqlFilterInternal(MetaStoreDirectSql.java:429)
at 
org.apache.hadoop.hive.metastore.MetaStoreDirectSql.getPartitions(MetaStoreDirectSql.java:224)
at 
org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:1563)
at 
org.apache.hadoop.hive.metastore.ObjectStore$1.getSqlResult(ObjectStore.java:1559)
at 
org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2208)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsInternal(ObjectStore.java:1559)
at 
org.apache.hadoop.hive.metastore.ObjectStore.getPartitions(ObjectStore.java:1553)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:108)
at com.sun.proxy.$Proxy25.getPartitions(Unknown Source)
at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_partitions(HiveMetaStore.java:2516)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)

It looks like a memory problem. But If I switch another hive table to get data, 
the code works fine.

Any idea which direction should I start with?Config?

Thanks.

--
跑不完马拉松的摄影师不是好背包客。
下个目标,该是6K的峰了吧?恩。



Re: HiveContext created SchemaRDD's saveAsTable is not working on 1.2.0

2015-01-29 Thread Zhan Zhang
I think it is expected. Refer to the comments in saveAsTable Note that this 
currently only works with SchemaRDDs that are created from a HiveContext”. If I 
understand correctly, here the SchemaRDD means those generated by 
HiveContext.sql, instead of applySchema.

Thanks.

Zhan Zhang


On Jan 29, 2015, at 9:38 PM, matroyd 
debajyoti@healthagen.commailto:debajyoti@healthagen.com wrote:

Hi, I am trying saveAsTable on SchemaRDD created from HiveContext and it fails. 
This is on Spark 1.2.0. Following are details of the code, command and 
exceptions: 
http://stackoverflow.com/questions/28222496/how-to-enable-sql-on-schemardd-via-the-jdbc-interface-is-it-even-possiblehttp://cp.mcafee.com/d/5fHCMUe6zqb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJ4SOrLOtXTLuZXTdTdw0WjSNmFDUKDRcsLwITZ9OH2C9L9FFI6zBP2tj1uti_MQwvVuwIunH0LiNfRtywwncOQLcDVshg8mbOxfUKXrOYG5Vjb-p-1tqJai87-rrFYiYvCT61tdZxZYKa2xfo4jytoPH0Nm9mDbwGySNaZGSS9_M04SOevpdxuhDNaI9-7Pd45E_I_gd40NoDWKwIe3zhfgQgkQPUxg4WgfYQgiEq88lCq835oBpg8Cy2I3h0zOvndK3zsm65OVJP
 Thanks in advance for any guidance

View this message in context: HiveContext created SchemaRDD's saveAsTable is 
not working on 
1.2.0http://cp.mcafee.com/d/FZsScxNJ5xddYQsKc3xNKVJ6XzRTS6mm3qdT3hOMUOyyCrhKUZtZxBBYSztMQsLFTpo76Ngk9X0y2ytoPH0Nm9mDbCOtoPH0Nm9mDbCX3zhOygIM_R-pvpo73KLsKCOONPwUQsTjd7fbnhIyyGyyNOEuvkzaT0QSCrpdTVeZXTLuZXCXCM0uHroDVuySNaBSWv4KvaA-hLt5ZO_gpW6A21_YLwnApYiH2vxYOZE_I_gbz5yvGW2MUebQbQQPUxg4WgfYLaxu5pBP5oBpg8BWMbEBQOZOYRAQm6me1NJKDNbN-rso5QTS7TOUEa4Zwhe9RzeI35oBqsK2Gbr4HSHroD_00jr8VZAS5V6v4GMDUvcQgmz-PZ0Qg35yvGW2MUed4Z3h1jjfy50jF0_Ph1axEwxmpEwclylB0yq8aMd42f9ZsSUedGaVp
Sent from the Apache Spark User List mailing list 
archivehttp://cp.mcafee.com/d/k-Kr43qb2qrVEVso73ztPqdT7HLIcII6QrK6zBxNB55cSztNWXX3bbVJ6XxEVvjKOMedywEjS1454WNDm1yIiJendAWNDm1yIiJendS76zB4xpx_HYO-OMe7tuVtdBBzD1NEVKCqeumKzp55l55zBgY-F6lK1FJASOrLOtXTLuZXTdTdw0ZmSNfOZ5JylbJQ-9s-l9YzuWbXB-wPQd843_Vv3rtfynzYSUMbFLIfLBNgk9X0ysjH6to6aNaQVs5kmS9nJmSNf-00CShPX9IbOc-9lxfM-pEwJ7ZDW1Ew6b4_lQ5xMsq9W6y2CCv4a0Di1_Cy2l3h12IPh0oH4Ha14Qglwq84ujWVJMsrlDSD
 at Nabble.comhttp://Nabble.com.



Re: Connect to Hive metastore (on YARN) from Spark Shell?

2015-01-21 Thread Zhan Zhang
You can put hive-site.xml in your conf/ directory. It will connect to Hive when 
HiveContext is initialized.

Thanks.

Zhan Zhang

On Jan 21, 2015, at 12:35 PM, YaoPau jonrgr...@gmail.com wrote:

 Is this possible, and if so what steps do I need to take to make this happen? 
  
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OOM for HiveFromSpark example

2015-01-13 Thread Zhan Zhang
Hi Folks,

I am trying to run hive context in yarn-cluster mode, but met some error. Does 
anybody know what cause the issue. 

I use following cmd to build the distribution: 

 ./make-distribution.sh -Phive -Phive-thriftserver  -Pyarn  -Phadoop-2.4

15/01/13 17:59:42 INFO cluster.YarnClusterScheduler: 
YarnClusterScheduler.postStartHook done
15/01/13 17:59:42 INFO storage.BlockManagerMasterActor: Registering block 
manager cn122-10.l42scl.hortonworks.com:56157 with 1589.8 MB RAM, 
BlockManagerId(2, cn122-10.l42scl.hortonworks.com, 56157)
15/01/13 17:59:43 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 17:59:43 INFO parse.ParseDriver: Parse Completed
15/01/13 17:59:44 INFO metastore.HiveMetaStore: 0: Opening raw store with 
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/01/13 17:59:44 INFO metastore.ObjectStore: ObjectStore, initialize called
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
datanucleus.cache.level2 unknown - will be ignored
15/01/13 17:59:44 INFO DataNucleus.Persistence: Property 
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:44 WARN DataNucleus.Connection: BoneCP specified but not present 
in CLASSPATH (or one of dependencies)
15/01/13 17:59:52 INFO metastore.ObjectStore: Setting MetaStore object pin 
classes with 
hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
15/01/13 17:59:52 INFO metastore.MetaStoreDirectSql: MySQL check failed, 
assuming we are not on mysql: Lexical error at line 1, column 5.  Encountered: 
@ (64), after : .
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:53 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
15/01/13 17:59:59 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
15/01/13 18:00:00 INFO metastore.ObjectStore: Initialized ObjectStore
15/01/13 18:00:00 WARN metastore.ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 0.13.1aa
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added admin role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: Added public role in metastore
15/01/13 18:00:01 INFO metastore.HiveMetaStore: No user is added in admin role, 
since config is empty
15/01/13 18:00:01 INFO session.SessionState: No Tez session required at this 
point. hive.execution.engine=mr.
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=Driver.run 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=TimeToSubmit 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:02 INFO ql.Driver: Concurrency mode is disabled, not creating a 
lock manager
15/01/13 18:00:02 INFO log.PerfLogger: PERFLOG method=compile 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=parse 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT 
EXISTS src (key INT, value STRING)
15/01/13 18:00:03 INFO parse.ParseDriver: Parse Completed
15/01/13 18:00:03 INFO log.PerfLogger: /PERFLOG method=parse 
start=1421190003030 end=1421190003031 duration=1 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO log.PerfLogger: PERFLOG method=semanticAnalyze 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Starting Semantic Analysis
15/01/13 18:00:03 INFO parse.SemanticAnalyzer: Creating table src position=27
15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_table : db=default 
tbl=src
15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang  ip=unknown-ip-addr  
cmd=get_table : db=default tbl=src
15/01/13 18:00:03 INFO metastore.HiveMetaStore: 0: get_database: default
15/01/13 18:00:03 INFO HiveMetaStore.audit: ugi=zzhang  ip=unknown-ip-addr  
cmd=get_database: default
15/01/13 18:00:03 INFO ql.Driver: Semantic Analysis Completed
15/01/13 18:00:03 INFO log.PerfLogger: /PERFLOG method=semanticAnalyze 
start=1421190003031 end=1421190003406 duration=375 
from=org.apache.hadoop.hive.ql.Driver
15/01/13 18:00:03 INFO ql.Driver: Returning Hive schema: 
Schema(fieldSchemas:null, properties:null)
15/01/13 18:00:03 INFO log.PerfLogger: /PERFLOG method=compile 
start=1421190002998 end=1421190003416 

Re: Driver hangs on running mllib word2vec

2015-01-05 Thread Zhan Zhang
I think it is overflow. The training data is quite big. The algorithms  
scalability highly depends on the vocabSize. Even without overflow, there are 
still other bottlenecks, for example, syn0Global and syn1Global, each of them 
has vocabSize * vectorSize elements.

Thanks.

Zhan Zhang


On Jan 5, 2015, at 7:47 PM, Eric Zhen zhpeng...@gmail.com wrote:

 Hi Xiangrui,
 
 Our dataset is about 80GB(10B lines). 
 
 In the driver's log, we foud this:
 
 INFO Word2Vec: trainWordsCount = -1610413239
 
 it seems that there is a integer overflow?
 
 
 On Tue, Jan 6, 2015 at 5:44 AM, Xiangrui Meng men...@gmail.com wrote:
 How big is your dataset, and what is the vocabulary size? -Xiangrui
 
 On Sun, Jan 4, 2015 at 11:18 PM, Eric Zhen zhpeng...@gmail.com wrote:
  Hi,
 
  When we run mllib word2vec(spark-1.1.0), driver get stuck with 100% cup
  usage. Here is the jstack output:
 
  main prio=10 tid=0x40112800 nid=0x46f2 runnable
  [0x4162e000]
 java.lang.Thread.State: RUNNABLE
  at
  java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1847)
  at
  java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1778)
  at java.io.DataOutputStream.writeInt(DataOutputStream.java:182)
  at java.io.DataOutputStream.writeFloat(DataOutputStream.java:225)
  at
  java.io.ObjectOutputStream$BlockDataOutputStream.writeFloats(ObjectOutputStream.java:2064)
  at
  java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1310)
  at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
  at
  java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
  at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
  at
  java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
  at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
  at
  java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
  at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
  at
  java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
  at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
  at
  java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
  at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
  at
  java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
  at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
  at
  java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
  at
  org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
  at
  org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
  at
  org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
  at
  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:610)
  at
  org.apache.spark.mllib.feature.Word2Vec$$anonfun$fit$1.apply$mcVI$sp(Word2Vec.scala:291)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:290)
  at com.baidu.inf.WordCount$.main(WordCount.scala:31)
  at com.baidu.inf.WordCount.main(WordCount.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  at
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  at java.lang.reflect.Method.invoke(Method.java:597)
  at
  org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
  --
  Best Regards
 
 
 
 -- 
 Best Regards


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Spark 1.2 + Avro file does not work in HDP2.2

2014-12-16 Thread Zhan Zhang
Hi Manas,

There is a small patch needed for HDP2.2. You can refer to this PR
https://github.com/apache/spark/pull/3409

There are some other issues compiling against hadoop2.6. But we will fully 
support it very soon. You can ping me, if you want.

Thanks.

Zhan Zhang

On Dec 12, 2014, at 11:38 AM, Manas Kar manasdebashis...@gmail.com wrote:

 Hi Experts, 
  I have recently installed HDP2.2(Depends on hadoop 2.6).
  My spark 1.2 is built with hadoop 2.4 profile.
 
  My program has following dependencies
 val avro= org.apache.avro % avro-mapred %1.7.7
 val spark   = org.apache.spark % spark-core_2.10 % 1.2.0 % 
 provided
 
 My program to read avro files fails with the following error. What am I doing 
 wrong?
 
 
 java.lang.IncompatibleClassChangeError: Found interface 
 org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
   at 
 org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
   at 
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
   at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Zhan Zhang
Please check whether 
https://github.com/apache/spark/pull/3409#issuecomment-64045677 solve the 
problem for launching AM.

Thanks.

Zhan Zhang
On Dec 1, 2014, at 4:49 PM, Mohammad Islam misla...@yahoo.com.INVALID wrote:

 Hi,
 How to pass the Java options (such as -XX:MaxMetaspaceSize=100M) when 
 lunching AM or task containers?
 
 This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, 
 setting the property such as 
 mapreduce.map.java.opts would do the work.
 
 Any help would be highly appreciated.
 
 Regards,
 Mohammad
 
 
 
 
  


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Spark SQL Hive Version

2014-11-05 Thread Zhan Zhang
The original spark-project hive-0.13.1 has some problem with packaging causing 
version conflicts, and hive-0.13.1a is repackaged to solve the problem. They 
share the same official hive source code release 0.13.1, with unnecessary 
package removed from the original official hive release package. You can refer 
to https://github.com/apache/spark/pull/2685 for the whole story.

Thanks.

Zhan Zhang

Thanks.

Zhan Zhang

On Nov 5, 2014, at 4:47 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Hi, all, I noticed that when compiling the SparkSQL with profile 
 “hive-0.13.1”, it will fetch the Hive version of 0.13.1a under groupId 
 “org.spark-project.hive”, what’s the difference with the one of 
 “org.apache.hive”? And where can I get the source code for re-compiling?
  
 Thanks,
 Cheng Hao


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Use RDD like a Iterator

2014-10-30 Thread Zhan Zhang
RDD.toLocalIterator return the partition one by one but with all elements in 
the partition, which is not lazy calculated. Given the design of spark, it is 
very hard to maintain the state of iterator across runJob.

  def toLocalIterator: Iterator[T] = {
def collectPartition(p: Int): Array[T] = {
  sc.runJob(this, (iter: Iterator[T]) = iter.toArray, Seq(p), allowLocal = 
false).head
}
(0 until partitions.length).iterator.flatMap(i = collectPartition(i))
  }

Thanks.

Zhan Zhang

On Oct 29, 2014, at 3:43 AM, Yanbo Liang yanboha...@gmail.com wrote:

 RDD.toLocalIterator() is the suitable solution.
 But I doubt whether it conform with the design principle of spark and RDD.
 All RDD transform is lazily computed until it end with some actions. 
 
 2014-10-29 15:28 GMT+08:00 Sean Owen so...@cloudera.com:
 Call RDD.toLocalIterator()?
 
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html
 
 On Wed, Oct 29, 2014 at 4:15 AM, Dai, Kevin yun...@ebay.com wrote:
  Hi, ALL
 
 
 
  I have a RDD[T], can I use it like a iterator.
 
  That means I can compute every element of this RDD lazily.
 
 
 
  Best Regards,
 
  Kevin.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: run multiple spark applications in parallel

2014-10-28 Thread Zhan Zhang
You can set your executor number with --num-executors. Also changing 
yarn-client save you one container for driver. Then check your yarn resource 
manager to make sure there are more containers available to serve your extra 
apps.

Thanks.

Zhan Zhang

On Oct 28, 2014, at 5:31 PM, Soumya Simanta soumya.sima...@gmail.com wrote:

 Maybe changing --master yarn-cluster to --master yarn-client help. 
 
 
 On Tue, Oct 28, 2014 at 7:25 PM, Josh J joshjd...@gmail.com wrote:
 Sorry, I should've included some stats with my email
 
 I execute each job in the following manner
 
 ./bin/spark-submit --class CLASSNAME --master yarn-cluster --driver-memory 1g 
 --executor-memory 1g --executor-cores 1 UBER.JAR ${ZK_PORT_2181_TCP_ADDR} 
 my-consumer-group1 1
 
 
 
 The box has 
 
 24 CPUs, Intel(R) Xeon(R) CPU E5-2420 v2 @ 2.20GHz
 
 32 GB RAM
 
 
 
 Thanks,
 
 Josh
 
 
 On Tue, Oct 28, 2014 at 4:15 PM, Soumya Simanta soumya.sima...@gmail.com 
 wrote:
 Try reducing the resources (cores and memory) of each application.
 
 
 
  On Oct 28, 2014, at 7:05 PM, Josh J joshjd...@gmail.com wrote:
 
  Hi,
 
  How do I run multiple spark applications in parallel? I tried to run on 
  yarn cluster, though the second application submitted does not run.
 
  Thanks,
  Josh
 
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Use RDD like a Iterator

2014-10-28 Thread Zhan Zhang
I think it is already lazily computed, or do you mean something else? Following 
is the signature of compute in RDD

 def compute(split: Partition, context: TaskContext): Iterator[T]

Thanks.

Zhan Zhang

On Oct 28, 2014, at 8:15 PM, Dai, Kevin yun...@ebay.com wrote:

 Hi, ALL
  
 I have a RDD[T], can I use it like a iterator.
 That means I can compute every element of this RDD lazily.
  
 Best Regards,
 Kevin.


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: how to retrieve the value of a column of type date/timestamp from a Spark SQL Row

2014-10-28 Thread Zhan Zhang
Can you use row(i).asInstanceOf[]

Thanks.

Zhan Zhang


On Oct 28, 2014, at 5:03 PM, Mohammed Guller moham...@glassbeam.com wrote:

 Hi –
  
 The Spark SQL Row class has methods such as getInt, getLong, getBoolean, 
 getFloat, getDouble, etc. However, I don’t see a getDate method. So how can 
 one retrieve a date/timestamp type column from a result set?
  
 Thanks,
 Mohammed


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: sortByKey trouble

2014-09-24 Thread Zhan Zhang
Try this
Import org.apache.spark.SparkContext._

Thanks.

Zhan Zhang

On Sep 24, 2014, at 6:13 AM, david david...@free.fr wrote:

 thank's
 
 i've already try this solution but it does not compile (in Eclipse)
 
  I'm surprise to see that in Spark-shell, sortByKey works fine on 2
 solutions :
 
   (String,String,String,String)
   (String,(String,String,String))
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989p15002.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting one RDD to another

2014-09-23 Thread Zhan Zhang
Here is my understanding

 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
if (num == 0) { //if 0, return empty array
  Array.empty
} else {
  mapPartitions { items =  //map each partition to a a new one 
with the iterator consists of the single queue, which has num of elements.
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
  }.reduce { (queue1, queue2) =  //runJob is called here to collect all 
the element from rdd, which is actually a queue from each partition.
queue1 ++= queue2
queue1
  }.toArray.sorted(ord) //to array and sort 
}
  }



On Sep 23, 2014, at 9:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:

 Hi,
 Is it always possible to get one RDD from another.
 For example, if I do a top(K)(Ordering), I get an Int right? (In my 
 example the type is Int). I do not get an RDD.
 Can anyone explain this to me?
 Thank You


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


  1   2   >