RE: Possible long lineage issue when using DStream to update a normal RDD

2015-05-08 Thread Shao, Saisai
IIUC only checkpoint will clean the lineage information, cache will not cut the 
lineage. Also checkpoint will put the data in HDFS, not local disk :)

I think you can use foreachRDD to do such RDD update work, it’s OK as I know 
from your code snippet.

From: Chunnan Yao [mailto:yaochun...@gmail.com]
Sent: Friday, May 8, 2015 2:51 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Possible long lineage issue when using DStream to update a normal 
RDD

Thank you for this suggestion! But may I ask what's the advantage to use 
checkpoint instead of cache here? Cuz they both cut lineage. I only know 
checkpoint saves RDD in disk, while cache in memory. So may be it's for 
reliability?

Also on http://spark.apache.org/docs/latest/streaming-programming-guide.html, I 
have not seen usage of "foreachRDD" like mine. Here I am not pushing data to 
external system. I just use it to update an RDD in Spark. Is this right?



2015-05-08 14:03 GMT+08:00 Shao, Saisai 
mailto:saisai.s...@intel.com>>:
I think you could use checkpoint to cut the lineage of `MyRDD`, I have a 
similar scenario and I use checkpoint to workaround this problem :)

Thanks
Jerry

-Original Message-
From: yaochunnan [mailto:yaochun...@gmail.com]
Sent: Friday, May 8, 2015 1:57 PM
To: user@spark.apache.org
Subject: Possible long lineage issue when using DStream to update a normal RDD

Hi all,
Recently in our project, we need to update a RDD using data regularly received 
from DStream, I plan to use "foreachRDD" API to achieve this:
var MyRDD = ...
dstream.foreachRDD { rdd =>
  MyRDD = MyRDD.join(rdd)...
  ...
}

Is this usage correct? My concern is, as I am repeatedly and endlessly 
reassigning MyRDD in order to update it, will it create a too long RDD lineage 
to process when I want to query MyRDD later on (similar as
https://issues.apache.org/jira/browse/SPARK-4672) ?

Maybe I should:
1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a 
dstream comes in.
2. use the unpublished IndexedRDD
(https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update.

As I lack experience using Spark Streaming and indexedRDD, I am here to make 
sure my thoughts are on the right track. Your wise suggestions will be greatly 
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.org



Re: (无主题)

2015-05-08 Thread Akhil Das
Since its loading 24 records, it could be that your CSV is corrupted? (may
be the new line char isn't \n, but \r\n if it comes from a windows
environment. You can check this with *cat -v yourcsvfile.csv | more*).

Thanks
Best Regards

On Fri, May 8, 2015 at 11:23 AM,  wrote:

> Hi guys,
>
>  I got a "PhoenixParserException: ERROR 601 (42P00): Syntax error.
> Encountered "FORMAT" at line 21, column 141." when creating a table by
> using "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY
> '\n'".
>
>  As I remember, previous version phoenix support this grammar, doesn't
> it? Actually I wanna load a csv file of 1.7million record into an empty
> table ,however after i finished loading this file, and count the table, I
> got a result of 24 lines...
>
>  how to make the table fetch a multiply line file correctly?
>   thanks.
>
> 
>
> Thanks&Best regards!
> San.Luo
>


Re: YARN mode startup takes too long (10+ secs)

2015-05-08 Thread Zoltán Zvara
So is this sleep occurs before allocating resources for the first few
executors to start the job?

On Fri, May 8, 2015 at 6:23 AM Taeyun Kim 
wrote:

> I think I’ve found the (maybe partial, but major) reason.
>
>
>
> It’s between the following lines, (it’s newly captured, but essentially
> the same place that Zoltán Zvara picked:
>
>
>
> 15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager
>
> 15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor:
> Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753]
> with ID 1
>
>
>
> When I read the logs on cluster side, the following lines were found: (the
> exact time is different with above line, but it’s the difference between
> machines)
>
>
>
> 15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter
> thread - sleep time : 5000
>
> 15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for :
> cluster04:45454
>
>
>
> It seemed that Spark deliberately sleeps 5 secs.
>
> I’ve read the Spark source code, and in
> org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread()
> had the code for that.
>
> It loops calling allocator.allocateResources() and Thread.sleep().
>
> For sleep, it reads the configuration variable
> spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000,
> which is 5 secs).
>
> According to the comment, “we want to be reasonably responsive without
> causing too many requests to RM”.
>
> So, unless YARN immediately fulfill the allocation request, it seems that
> 5 secs will be wasted.
>
>
>
> When I modified the configuration variable to 1000, it only waited for 1
> sec.
>
>
>
> Here is the log lines after the change:
>
>
>
> 15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter
> thread - sleep time : 1000
>
> 15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for :
> cluster04:45454
>
>
>
> 4 secs saved.
>
> So, when one does not want to wait 5 secs, one can change the
> spark.yarn.scheduler.heartbeat.interval-ms.
>
> I hope that the additional overhead it incurs would be negligible.
>
>
>
>
>
> *From:* Zoltán Zvara [mailto:zoltan.zv...@gmail.com]
> *Sent:* Thursday, May 07, 2015 10:05 PM
> *To:* Taeyun Kim; user@spark.apache.org
> *Subject:* Re: YARN mode startup takes too long (10+ secs)
>
>
>
> Without considering everything, just a few hints:
>
> You are running on YARN. From 09:18:34 to 09:18:37 your application is in
> state ACCEPTED. There is a noticeable overhead introduced due to
> communicating with YARN's ResourceManager, NodeManager and given that the
> YARN scheduler needs time to make a decision. I guess somewhere
> from 09:18:38 to 09:18:43 your application JAR gets copied to another
> container requested by the Spark ApplicationMaster deployed on YARN's
> container 0. Deploying an executor needs further resource negotiations with
> the ResourceManager usually. Also, as I said, your JAR and Executor's code
> requires copying to the container's local directory - execution blocked
> until that is complete.
>
>
>
> On Thu, May 7, 2015 at 3:09 AM Taeyun Kim 
> wrote:
>
> Hi,
>
>
>
> I’m running a spark application with YARN-client or YARN-cluster mode.
>
> But it seems to take too long to startup.
>
> It takes 10+ seconds to initialize the spark context.
>
> Is this normal? Or can it be optimized?
>
>
>
> The environment is as follows:
>
> - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)
>
> - Spark: 1.3.1
>
> - Client: Windows 7, but similar result on CentOS 6.6
>
>
>
> The following is the startup part of the application log. (Some private
> information was edited)
>
> ‘Main: Initializing context’ at the first line and ‘MainProcessor:
> Deleting previous output files’ at the last line are the logs by the
> application. Others in between are from Spark itself. Application logic is
> executed after this log is displayed.
>
>
>
> ---
>
>
>
> 15/05/07 09:18:31 INFO Main: Initializing context
>
> 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1
>
> 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp
>
> 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to:
> myuser,myapp
>
> 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(myuser,
> myapp); users with modify permissions: Set(myuser, myapp)
>
> 15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started
>
> 15/05/07 09:18:31 INFO Remoting: Starting remoting
>
> 15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@mymachine:54449]
>
> 15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver'
> on port 54449.
>
> 15/05/07 09:18:31 INFO SparkEnv: Registering MapOutputTracker
>
> 15/05/07 09:18:32 INFO SparkEnv: Registering BlockManagerMaster
>
> 15/05/07 09:18:32 INFO DiskBlockManager: Created local directory at
> C:\Users\myuser\AppData\Local\Temp\spark-2d3db9d6-ea78-

Re: Master node memory usage question

2015-05-08 Thread Akhil Das
Whats your usecase and what are you trying to achieve? May be there's a
better way of doing it.

Thanks
Best Regards

On Fri, May 8, 2015 at 10:20 AM, Richard Alex Hofer 
wrote:

> Hi,
> I'm working on a project in Spark and am trying to understand what's going
> on. Right now to try and understand what's happening we came up with this
> snippet of code which very roughly resembles what we're actually doing.
> When trying to run this our master node ends up quickly using up its memory
> even though all of our RDDs are very small. Can someone explain what's
> going on here and how we can avoid it?
>
> a = sc.parallelize(xrange(100),10)
> b = a
>
> for i in xrange(10):
> a = a.map(lambda x: x + 1)
> if i % 300 == 0:
> # We do this to try and force some of our RDD to evaluate
> a.persist()
> a.foreachPartition(lambda _: None)
> b.unpersist()
> b = a
> a.collect()
> b.unpersist()
>
> -Richard Hofer
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Getting data into Spark Streaming

2015-05-08 Thread Akhil Das
I don't think you can use rawSocketStream since the RSVP is from a web
server and you will have to send a GET request first to initialize the
communication. You are better off writing a custom receiver
 for
your usecase. For a start, you can actually look at the
TwitterUtils.createTwitterStream

.



Thanks
Best Regards

On Fri, May 8, 2015 at 4:29 AM, Sathaye  wrote:

> Hi I am pretty new to spark and I am trying to implement a simple spark
> streaming application using Meetup's RSVP stream:
> stream.meetup.com/2/rsvps
> Any idea how to connect the stream to Spark Streaming?
> I am trying out rawSocketStream but not sure what the parameters are(viz.
> port)
>
> Thank you
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-data-into-Spark-Streaming-tp22806.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


updateStateByKey - how to generate a stream of state changes?

2015-05-08 Thread minisaw
imagine an input stream transformed by updateStateByKey, based on some state. 

as an output of the transformation, i would like to have a stream of state
changes only - not the stream of states themselves. 

what is the natural way of obtaining such a stream? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-how-to-generate-a-stream-of-state-changes-tp22813.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: AWS-Credentials fails with org.apache.hadoop.fs.s3.S3Exception: FORBIDDEN

2015-05-08 Thread Akhil Das
Have a look at this SO

question,
it has discussion on various ways of accessing S3.

Thanks
Best Regards

On Fri, May 8, 2015 at 1:21 AM, in4maniac  wrote:

> Hi Guys,
>
> I think this problem is related to :
>
> http://apache-spark-user-list.1001560.n3.nabble.com/AWS-Credentials-for-private-S3-reads-td8689.html
>
> I am running pyspark 1.2.1 in AWS with my AWS credentials exported to
> master
> node as Environmental Variables.
>
> Halfway through my application, I get thrown with a
> org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
> S3 HEAD request failed for "file path" - ResponseCode=403,
> ResponseMessage=Forbidden
>
> Here is some important information about my job:
> + my AWS credentials exported to master node as Environmental Variables
> + there are no '/'s in my secret key
> + The earlier steps that uses this parquet file actually complete
> successsfully
> + The step before the count() does the following:
>+ reads the parquet file (SELECT STATEMENT)
>+ maps it to an RDD
>+ runs a filter on the RDD
> + The filter works as follows:
>+ extracts one field from each RDD line
>+ checks with a list of 40,000 hashes for presence (if field in
> LIST_OF_HASHES.value)
>+ LIST_OF_HASHES is a broadcast object
>
> The wierdness is that I am using this parquet file in earlier steps and it
> works fine. The other confusion I have is due to the fact that it only
> starts failing halfway through the stage. It completes a fraction of tasks
> and then starts failing..
>
> Hoping to hear something positive. Many thanks in advance
>
> Sahanbull
>
> The stack trace is as follows:
> >>> negativeObs.count()
> [Stage 9:==>   (161 + 240) /
> 800]
>
> 15/05/07 07:55:59 ERROR TaskSetManager: Task 277 in stage 9.0 failed 4
> times; aborting job
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/root/spark/python/pyspark/rdd.py", line 829, in count
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/root/spark/python/pyspark/rdd.py", line 820, in sum
> return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File "/root/spark/python/pyspark/rdd.py", line 725, in reduce
> vals = self.mapPartitions(func).collect()
>   File "/root/spark/python/pyspark/rdd.py", line 686, in collect
> bytesInJava = self._jrdd.collect().iterator()
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o139.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 277 in stage 9.0 failed 4 times, most recent failure: Lost task 277.3 in
> stage 9.0 (TID 4832, ip-172-31-1-185.ec2.internal):
> org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
> S3 HEAD request failed for
>
> '/subbucket%2Fpath%2F2Fpath%2F2Fpath%2F2Fpath%2F2Fpath%2Ffilename.parquet%2Fpart-r-349.parquet'
> - ResponseCode=403, ResponseMessage=Forbidden
> at
>
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:122)
> at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at org.apache.hadoop.fs.s3native.$Proxy9.retrieveMetadata(Unknown
> Source)
> at
>
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:326)
> at
> parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
> at
>
> parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
> at
> parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:135)
> at
> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> at org.apache.spark.rdd.RDD.iterator(RDD.sc

updateStateByKey - how to generate a stream of state changes?

2015-05-08 Thread mini saw
imagine an input stream transformed by updateStateByKey, based on some
state.

as an output of the transformation, i would like to have a stream of state
changes only - not the stream of states themselves.

what is the natural way of obtaining such a stream?


updateStateByKey - how to generate a stream of state changes?

2015-05-08 Thread mini saw
imagine an input stream transformed by updateStateByKey, based on some
state.

as an output of the transformation, i would like to have a stream of state
changes only - not the stream of states themselves.

what is the natural way of obtaining such a stream?


SparkStreaming + Flume/PDI+Kafka

2015-05-08 Thread GARCIA MIGUEL, DAVID
Hi!
I've been using spark for the last months and it is awesome. I'm pretty new on 
this topic so don't be too harsh on me.
Recently I've been doing some simple tests with Spark Streaming for log 
processing and I'm considering different ETL input solutions such as Flume or 
PDI+Kafka.

My use case will be:
1.- Collect logs from different applications located in different physical 
servers.
2.- Transform and pre-process those logs.
3.- Process all the logs data with spark streaming.

I've got a question regarding data processing where the data is located. 
Ideally I'd like spark-streaming (standalone, yarn or mesos) to handle the 
decision of processing data wherever it is located.

I know I can setup whatever flume workflow (agents --> collectors) I want and 
then upload the aggregated data to the HDFS. Where I guess the system will 
handle the best worker to operate on every split of data. Am I right?
Will spark-streaming + flume integration (without sinking into HDFS) provide 
this kind of behavior?

Any tips to point me in the right direction?



RE: The explanation of input text format using LDA in Spark

2015-05-08 Thread Yang, Yuhao
Hi Cui,

Try to read the scala version of LDAExample, 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
 

The matrix you're referring to is the corpus after vectorization. 

One example, given a dict, [apple, orange, banana]
3 documents:
Apple orange
Orange banana
Apple banana
Can be represented by dense vectors:
1, 1, 0
0, 1, 1
1, 0, 1

Cheers,
Yuhao


-Original Message-
From: Cui xp [mailto:lifeiniao...@gmail.com] 
Sent: Wednesday, May 6, 2015 4:28 PM
To: user@spark.apache.org
Subject: The explanation of input text format using LDA in Spark

Hi all,
   After I read the example code using LDA in Spark, I found the input text in 
the code is a matrix. the format of the text is as follows:
1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0
But I don't know the explanation of each line or each column. And if I have 
several text documents, how do I process them to use LDA in Spark? Thanks.


Cui xp



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-explanation-of-input-text-format-using-LDA-in-Spark-tp22781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL] cannot filter by a DateType column

2015-05-08 Thread Haopu Wang
I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

exception=

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

code=

 

val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val sc = new SparkContext(conf)

val sqlCtx = new HiveContext(sc)

import sqlCtx.implicits._



case class Test(dt: java.sql.Date)

 

val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF



var r = df.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")

var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

println("==")

 

// "df2" doesn't do filter correct!!

val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)



val schema = StructType(Array(StructField("dt", DateType, false)))



val df2 = sqlCtx.applySchema(rdd2, schema) 



r = df2.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")



r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

 



RE: YARN mode startup takes too long (10+ secs)

2015-05-08 Thread Taeyun Kim
I think so.

In fact, the flow is: allocator.allocateResources() -> sleep -> 
allocator.allocateResources() -> sleep ¡¦

But I guess that on the first allocateResources() the allocation is not 
fulfilled. So sleep occurs.

 

From: Zoltán Zvara [mailto:zoltan.zv...@gmail.com] 
Sent: Friday, May 08, 2015 4:25 PM
To: Taeyun Kim; user@spark.apache.org
Subject: Re: YARN mode startup takes too long (10+ secs)

 

So is this sleep occurs before allocating resources for the first few executors 
to start the job?

 

On Fri, May 8, 2015 at 6:23 AM Taeyun Kim  wrote:

I think I¡¯ve found the (maybe partial, but major) reason.

 

It¡¯s between the following lines, (it¡¯s newly captured, but essentially the 
same place that Zoltán Zvara picked:

 

15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager

15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753] with 
ID 1

 

When I read the logs on cluster side, the following lines were found: (the 
exact time is different with above line, but it¡¯s the difference between 
machines)

 

15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread 
- sleep time : 5000

15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : 
cluster04:45454

 

It seemed that Spark deliberately sleeps 5 secs.

I¡¯ve read the Spark source code, and in 
org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() 
had the code for that.

It loops calling allocator.allocateResources() and Thread.sleep(). 

For sleep, it reads the configuration variable 
spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 
5 secs).

According to the comment, ¡°we want to be reasonably responsive without causing 
too many requests to RM¡±.

So, unless YARN immediately fulfill the allocation request, it seems that 5 
secs will be wasted.

 

When I modified the configuration variable to 1000, it only waited for 1 sec.

 

Here is the log lines after the change:

 

15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread 
- sleep time : 1000

15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : 
cluster04:45454

 

4 secs saved.

So, when one does not want to wait 5 secs, one can change the 
spark.yarn.scheduler.heartbeat.interval-ms.

I hope that the additional overhead it incurs would be negligible.  

 

 

From: Zoltán Zvara [mailto:zoltan.zv...@gmail.com] 
Sent: Thursday, May 07, 2015 10:05 PM
To: Taeyun Kim; user@spark.apache.org
Subject: Re: YARN mode startup takes too long (10+ secs)

 

Without considering everything, just a few hints:

You are running on YARN. From 09:18:34 to 09:18:37 your application is in state 
ACCEPTED. There is a noticeable overhead introduced due to communicating with 
YARN's ResourceManager, NodeManager and given that the YARN scheduler needs 
time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your 
application JAR gets copied to another container requested by the Spark 
ApplicationMaster deployed on YARN's container 0. Deploying an executor needs 
further resource negotiations with the ResourceManager usually. Also, as I 
said, your JAR and Executor's code requires copying to the container's local 
directory - execution blocked until that is complete.

 

On Thu, May 7, 2015 at 3:09 AM Taeyun Kim  wrote:

Hi,

 

I’m running a spark application with YARN-client or YARN-cluster mode.

But it seems to take too long to startup.

It takes 10+ seconds to initialize the spark context.

Is this normal? Or can it be optimized?

 

The environment is as follows:

- Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)

- Spark: 1.3.1

- Client: Windows 7, but similar result on CentOS 6.6

 

The following is the startup part of the application log. (Some private 
information was edited)

‘Main: Initializing context’ at the first line and ‘MainProcessor: Deleting 
previous output files’ at the last line are the logs by the application. Others 
in between are from Spark itself. Application logic is executed after this log 
is displayed.

 

---

 

15/05/07 09:18:31 INFO Main: Initializing context

15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1

15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp

15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to: myuser,myapp

15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(myuser, myapp); 
users with modify permissions: Set(myuser, myapp)

15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started

15/05/07 09:18:31 INFO Remoting: Starting remoting

15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@mymachine:54449]

15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver' on 
port 54449.

15/05/07 09:18:31 INFO SparkEnv: Register

Re: AWS-Credentials fails with org.apache.hadoop.fs.s3.S3Exception: FORBIDDEN

2015-05-08 Thread in4maniac
HI GUYS... I realised that it was a bug in my code that caused the code to
break.. I was running the filter on a SchemaRDD when I was supposed to be
running it on an RDD. 

But I still don't understand why the stderr was about S3 request rather than
a type checking error such as "No tuple position 0 found in Row type" was
thrown. The error was kinda misleading that I kindof oversaw this logical
error in my code. 

Just thought should keep this posted. 

-in4



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/AWS-Credentials-fails-with-org-apache-hadoop-fs-s3-S3Exception-FORBIDDEN-tp22800p22815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to join table across data sources using sparkSQL

2015-05-08 Thread Ishwardeep Singh
Finally got it working.

I was trying to access hive using the jdbc driver like I was trying to
access the terradata. 

It took me some time to figure out that default sqlContext created by Spark
supported hive and it uses the hive-site.xml in spark conf folder to access
hive.

I had to use my database in hive.

spark-shell> sqlContext.sql("use terradata_live")

Then I registered by terradata database tables as temporary tables.

spark-shell>  val itemDF=
hc.load("jdbc",Map("url"->"jdbc:teradata://192.168.145.58/DBS_PORT=1025,DATABASE=BENCHQADS,LOB_SUPPORT=OFF,USER=
BENCHQADS,PASSWORD=","dbtable" -> "item")) 

spark-shell> itemDF.registerTempTable("itemterra")

spark-shell> sqlContext.sql("select store_sales.* from store_sales join
itemterra on (store_sales.id = itemterra.sales_id)

But these seems to be some issue when I try to do the same using hive jdbc
driver. Another difference that I found was in printSchema() output.
printSchema() output for data frame created using hive driver prefixes the
column names with table name but the same does not happen for terradata
tables.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-join-table-across-data-sources-using-sparkSQL-tp22761p22816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread Peter Rudenko

Hi, i have a next question:

|val data = sc.textFile("s3:///") val df = data.toDF 
df.saveAsParquetFile("hdfs://") df.someAction(...) |


if during someAction some workers would die, would recomputation 
download files from s3 or from hdfs parquet?


Thanks,
Peter Rudenko

​


Re: Virtualenv pyspark

2015-05-08 Thread Nicholas Chammas
This is an interesting question. I don't have a solution for you, but you
may be interested in taking a look at Anaconda Cluster
.

It's made by the same people behind Conda (an alternative to pip focused on
data science pacakges) and may offer a better way of doing this. Haven't
used it though.

On Thu, May 7, 2015 at 5:20 PM alemagnani  wrote:

> I am currently using pyspark with a virtualenv.
> Unfortunately I don't have access to the nodes file system and therefore I
> cannot  manually copy the virtual env over there.
>
> I have been using this technique:
>
> I first add a tar ball with the venv
> sc.addFile(virtual_env_tarball_file)
>
> Then in the code used on the node to do the computation I activate the venv
> like this:
> venv_location = SparkFiles.get(venv_name)
> activate_env="%s/bin/activate_this.py" % venv_location
> execfile(activate_env, dict(__file__=activate_env))
>
> Is there a better way to do this?
> One of the problem with this approach is that in
> spark/python/pyspark/statcounter.py numpy is imported
> before the venv is activated and this can cause conflicts with the venv
> numpy.
>
> Moreover this requires the venv to be sent around in the cluster all the
> time.
> Any suggestions?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Virtualenv-pyspark-tp22803.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


filterRDD and flatMap

2015-05-08 Thread hmaeda
Dear Usergroup,

I am struggling to use the SparkR pacakge that comes with apache spark 1.4.0

I am having trouble getting the tutorials in the original
amplabs-extra/SparkR-pkg working.

Please see my stackoverflow question with a bounty for more details...here
(http://stackoverflow.com/questions/30057702/sparkr-filterrdd-and-flatmap-not-working)

Regards,

Hiddi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filterRDD-and-flatMap-tp22817.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread ayan guha
>From S3. As the dependency of df will be on s3. And because rdds are not
replicated.
On 8 May 2015 23:02, "Peter Rudenko"  wrote:

>  Hi, i have a next question:
>
> val data = sc.textFile("s3:///")val df = data.toDF
> df.saveAsParquetFile("hdfs://")
> df.someAction(...)
>
> if during someAction some workers would die, would recomputation download
> files from s3 or from hdfs parquet?
>
> Thanks,
> Peter Rudenko
> ​
>


Re: [SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread Peter Rudenko

Hm, thanks.
Do you know what this setting mean: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L1178 
?


Thanks,
Peter Rudenko

On 2015-05-08 17:48, ayan guha wrote:


From S3. As the dependency of df will be on s3. And because rdds are 
not replicated.


On 8 May 2015 23:02, "Peter Rudenko" > wrote:


Hi, i have a next question:

|val data = sc.textFile("s3:///") val df = data.toDF
df.saveAsParquetFile("hdfs://") df.someAction(...) |

if during someAction some workers would die, would recomputation
download files from s3 or from hdfs parquet?

Thanks,
Peter Rudenko

​





Submit Spark application in cluster mode and supervised

2015-05-08 Thread James King
I have two hosts host01 and host02 (lets call them)

I run one Master and two Workers on host01
I also run one Master and two Workers on host02

Now I have 1 LIVE Master on host01 and a STANDBY Master on host02
The LIVE Master is aware of all Workers in the cluster

Now I submit a Spark application using

bin/spark-submit --class SomeApp --deploy-mode cluster --supervise --master
spark://host01:7077 Some.jar

This to make the driver resilient to failure.

Now the interesting part:

If I stop the cluster (all daemons on all hosts) and restart
the Master and Workers *only* on host01 the job resumes! as expected.

But if I stop the cluster (all daemons on all hosts) and restart the Master
and Workers *only* on host02 the job *does not* resume execution! why?

I can see the driver on host02 WebUI listed but no job execution. Please
let me know why.

Am I wrong to expect it to resume execution in this case?


Re: Submit Spark application in cluster mode and supervised

2015-05-08 Thread James King
BTW I'm using Spark 1.3.0.

Thanks

On Fri, May 8, 2015 at 5:22 PM, James King  wrote:

> I have two hosts host01 and host02 (lets call them)
>
> I run one Master and two Workers on host01
> I also run one Master and two Workers on host02
>
> Now I have 1 LIVE Master on host01 and a STANDBY Master on host02
> The LIVE Master is aware of all Workers in the cluster
>
> Now I submit a Spark application using
>
> bin/spark-submit --class SomeApp --deploy-mode cluster --supervise
> --master spark://host01:7077 Some.jar
>
> This to make the driver resilient to failure.
>
> Now the interesting part:
>
> If I stop the cluster (all daemons on all hosts) and restart
> the Master and Workers *only* on host01 the job resumes! as expected.
>
> But if I stop the cluster (all daemons on all hosts) and restart the
> Master and Workers *only* on host02 the job *does not*
> resume execution! why?
>
> I can see the driver on host02 WebUI listed but no job execution. Please
> let me know why.
>
> Am I wrong to expect it to resume execution in this case?
>
>
>
>
>
>


Cluster mode and supervised app with multiple Masters

2015-05-08 Thread James King
Why does this not work

./spark-1.3.0-bin-hadoop2.4/bin/spark-submit --class SomeApp --deploy-mode
cluster --supervise --master spark://host01:7077,host02:7077 Some.jar

With exception:

Caused by: java.lang.NumberFormatException: For input string:
"7077,host02:7077"

It seems to accept only one master.

Can this be done with multiple Masters?

Thanks


dependencies on java-netlib and jblas

2015-05-08 Thread John Niekrasz
Newbie question...

Can I use any of the main ML capabilities of MLlib in a Java-only
environment, without any native library dependencies?

According to the documentation, java-netlib provides a JVM fallback. This
suggests that native netlib libraries are not required.

It appears that such a fallback is not available for jblas. However, a quick
look at the MLlib source suggests that MLlib's dependencies on jblas are
rather isolated:

> grep -R jblas
main/scala/org/apache/spark/ml/recommendation/ALS.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import
org.jblas.{DoubleMatrix, SimpleBlas}
main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:   
org.jblas.util.Random.seed(42)
main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import
org.jblas.DoubleMatrix
main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import
org.jblas.DoubleMatrix

Is it true or false that many of MLlib's capabilities will work perfectly
fine without any native (non-Java) libraries installed at all?

Thanks for the help,
John



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dependencies on java-netlib and jblas

2015-05-08 Thread Sonal Goyal
Hi John,

I have been using MLLIB without installing jblas native dependence.
Functionally I have not got stuck. I still need to explore if there are any
performance hits.

Best Regards,
Sonal
Founder, Nube Technologies 





On Fri, May 8, 2015 at 9:34 PM, John Niekrasz 
wrote:

> Newbie question...
>
> Can I use any of the main ML capabilities of MLlib in a Java-only
> environment, without any native library dependencies?
>
> According to the documentation, java-netlib provides a JVM fallback. This
> suggests that native netlib libraries are not required.
>
> It appears that such a fallback is not available for jblas. However, a
> quick
> look at the MLlib source suggests that MLlib's dependencies on jblas are
> rather isolated:
>
> > grep -R jblas
> main/scala/org/apache/spark/ml/recommendation/ALS.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import
> org.jblas.{DoubleMatrix, SimpleBlas}
>
> main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:
> org.jblas.util.Random.seed(42)
> main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import
> org.jblas.DoubleMatrix
>
> Is it true or false that many of MLlib's capabilities will work perfectly
> fine without any native (non-Java) libraries installed at all?
>
> Thanks for the help,
> John
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-shell breaks for scala 2.11 (with yarn)?

2015-05-08 Thread Koert Kuipers
i searched the jiras but couldnt find any recent mention of this. let me
try with 1.4.0 branch and see if it goes away...

On Wed, May 6, 2015 at 3:05 PM, Koert Kuipers  wrote:

> hello all,
> i build spark 1.3.1 (for cdh 5.3 with yarn) twice: for scala 2.10 and
> scala 2.11. i am running on a secure cluster. the deployment configs are
> identical.
>
> i can launch jobs just fine on both the scala 2.10 and scala 2.11 versions.
>
> spark-shell works on the scala 2.10 version, but not on the scala 2.11
> version. this is what i get:
>
> $ /usr/local/lib/spark-1.3.1-cdh5.3-scala2.11/bin/spark-shell
> 15/05/06 14:58:49 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/05/06 14:58:49 INFO SecurityManager: Changing view acls to: koert
> 15/05/06 14:58:49 INFO SecurityManager: Changing modify acls to: koert
> 15/05/06 14:58:49 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(koert); users
> with modify permissions: Set(koert)
> 15/05/06 14:58:50 INFO HttpServer: Starting HTTP Server
> 15/05/06 14:58:50 INFO Server: jetty-8.y.z-SNAPSHOT
> 15/05/06 14:58:50 INFO AbstractConnector: Started
> SocketConnector@0.0.0.0:36413
> 15/05/06 14:58:50 INFO Utils: Successfully started service 'HTTP server'
> on port 36413.
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures
> timed out after [10 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
> at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:95)
> at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:95)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.ready(package.scala:95)
> at
> scala.tools.nsc.interpreter.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:907)
> at
> scala.tools.nsc.interpreter.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:895)
> at
> scala.tools.nsc.interpreter.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:895)
> at
> scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:95)
> at scala.tools.nsc.interpreter.SparkILoop.process(SparkILoop.scala:895)
> at org.apache.spark.repl.Main$.main(Main.scala:46)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Re: Master node memory usage question

2015-05-08 Thread Richard Alex Hofer
Our use case is a bit complicated. Essentially our RDD is storing an N x 
N matrix A (where each value is a row) and we want to do a computation 
which involves another N x N matrix B. However, we have the unfortunate 
property that to update the i-th column of A we need to first update 
columns 1 to i-1 and we need the i-th row of B.


If B fit on every machine we could simply broadcast it and do a single 
map, however that's not the case. So, instead we have a loop from 1 to N 
where we broadcast a row of B then use this row in a map to compute a 
column of A. However, even much simpler cases of this like the code I'd 
posted here result in the master node running out of memory.


We suspect this has something to do with the large lineage A develops. 
We're currently trying to checkpoint A after persisting it to break this 
lineage up, but this doesn't appear to be working.


Any help here would be greatly appreciated. We'd also love to know if 
there's a better way to be doing this, but looking at our dependencies 
we believe this is the best we could do.


-Richard

On 05/08/2015 03:26 AM, Akhil Das wrote:
Whats your usecase and what are you trying to achieve? May be there's 
a better way of doing it.


Thanks
Best Regards

On Fri, May 8, 2015 at 10:20 AM, Richard Alex Hofer 
mailto:rho...@andrew.cmu.edu>> wrote:


Hi,
I'm working on a project in Spark and am trying to understand
what's going on. Right now to try and understand what's happening
we came up with this snippet of code which very roughly resembles
what we're actually doing. When trying to run this our master node
ends up quickly using up its memory even though all of our RDDs
are very small. Can someone explain what's going on here and how
we can avoid it?

a = sc.parallelize(xrange(100),10)
b = a

for i in xrange(10):
a = a.map(lambda x: x + 1)
if i % 300 == 0:
# We do this to try and force some of our RDD to evaluate
a.persist()
a.foreachPartition(lambda _: None)
b.unpersist()
b = a
a.collect()
b.unpersist()

-Richard Hofer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







parallelism on binary file

2015-05-08 Thread tog
Hi

I havé an application that currently run using MR. It currently starts
extracting information from a proprietary binary file that is copied to
HDFS. The application starts by creating business objects from information
extracted from the binary files. Later those objects are used for further
processing using again MR jobs.

I am planning to move towards Spark and I clearly see that I could use
JavaRDD for parallel processing. however it is not yet
obvious what could be the process to generate this RDD from my binary file
in parallel.

Today I use parallelism based on the split assign to each of the map
elements of a job. Can I mimick such a thing using spark. All example I
have seen so far are using text files for which I guess the partitions are
based on a given number of contiguous lines.

Any help or pointer would be appreciated

Cheers
Guillaume



-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net


Cassandra number of Tasks

2015-05-08 Thread Vijay Pawnarkar
I am using the Spark Cassandra connector to work with a table with 3
million records. Using .where() API to work with only a certain rows in
this table. Where clause filters the data to 1 rows.

CassandraJavaUtil.javaFunctions(sparkContext) .cassandraTable(KEY_SPACE,
MY_TABLE, CassandraJavaUtil.mapRowTo(MyClass.class)).where(cqlDataFilter,
cqlFilterParams)


Also using parameter spark.cassandra.input.split.size=1000

As this job is processed by Spark cluster, it created 3000 partitions
instead of 10. On spark cluster 3000 tasks are being executed. As the data
in our table grows to 30 million rows, this will create 30,000 tasks
instead of 10.

Is there a better way to approach process these 10,000 records with 10
tasks.

Thanks!


Re: Submit Spark application in cluster mode and supervised

2015-05-08 Thread Silvio Fiorito
If you’re using multiple masters with ZooKeeper then you should set your master 
URL to be

spark://host01:7077,host02:7077

And the property spark.deploy.recoveryMode=ZOOKEEPER

See here for more info: 
http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

From: James King
Date: Friday, May 8, 2015 at 11:22 AM
To: user
Subject: Submit Spark application in cluster mode and supervised

I have two hosts host01 and host02 (lets call them)

I run one Master and two Workers on host01
I also run one Master and two Workers on host02

Now I have 1 LIVE Master on host01 and a STANDBY Master on host02
The LIVE Master is aware of all Workers in the cluster

Now I submit a Spark application using

bin/spark-submit --class SomeApp --deploy-mode cluster --supervise --master 
spark://host01:7077 Some.jar

This to make the driver resilient to failure.

Now the interesting part:

If I stop the cluster (all daemons on all hosts) and restart the Master and 
Workers only on host01 the job resumes! as expected.

But if I stop the cluster (all daemons on all hosts) and restart the Master and 
Workers only on host02 the job does not resume execution! why?

I can see the driver on host02 WebUI listed but no job execution. Please let me 
know why.

Am I wrong to expect it to resume execution in this case?







Re: [SparkSQL] cannot filter by a DateType column

2015-05-08 Thread Michael Armbrust
What version of Spark are you using?  It appears that at least in master we
are doing the conversion correctly, but its possible older versions of
applySchema do not.  If you can reproduce the same bug in master, can you
open a JIRA?

On Fri, May 8, 2015 at 1:36 AM, Haopu Wang  wrote:

>  I want to filter a DataFrame based on a Date column.
>
>
>
> If the DataFrame object is constructed from a scala case class, it's
> working (either compare as String or Date). But if the DataFrame is
> generated by specifying a Schema to an RDD, it doesn't work. Below is the
> exception and test code.
>
>
>
> Do you have any idea about the error? Thank you very much!
>
>
>
> exception=
>
> *java.lang.ClassCastException*: java.sql.Date cannot be cast to
> java.lang.Integer
>
> at scala.runtime.BoxesRunTime.unboxToInt(*BoxesRunTime.java:106*)
>
> at
> org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$anonfun$apply$6.apply(
> *Cast.scala:116*)
>
> at org.apache.spark.sql.catalyst.expressions.Cast.org
> $apache$spark$sql$catalyst$expressions$Cast$$buildCast(*Cast.scala:111*)
>
> at
> org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.apply(
> *Cast.scala:116*)
>
> at org.apache.spark.sql.catalyst.expressions.Cast.eval(
> *Cast.scala:426*)
>
> at org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(
> *predicates.scala:305*)
>
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(
> *predicates.scala:30*)
>
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(
> *predicates.scala:30*)
>
> at scala.collection.Iterator$$anon$14.hasNext(*Iterator.scala:390*)
>
> at scala.collection.Iterator$$anon$11.hasNext(*Iterator.scala:327*)
>
>
>
> code=
>
>
>
> *val* conf = *new* SparkConf().setAppName("DFTest").setMaster(
> "local[*]")
>
> *val* sc = *new* SparkContext(conf)
>
> *val* sqlCtx = *new* HiveContext(sc)
>
> *import* sqlCtx.implicits._
>
>
>
> *case* *class* Test(dt: java.sql.Date)
>
>
>
> *val* df = *sc**.makeRDD(Seq(Test(**new** java.sql.Date(**115**,**4*
> *,**7***.toDF
>
>
>
> *var* r = df.filter("dt >= '2015-05-06'")
>
> r.explain(*true*)
>
> r.show
>
> println("==")
>
> *var* r2 = df.filter("dt >= cast('2015-05-06' as DATE)")
>
> r2.explain(*true*)
>
> r2.show
>
> println("==")
>
>
>
> // "df2" doesn't do filter correct!!
>
> *val* rdd2 = sc.makeRDD(Seq((Row(*new* java.sql.Date(115,4,7)
>
>
>
> *val* schema = StructType(Array(StructField("dt", DateType, *false*)))
>
>
>
> *val* df2 = sqlCtx.applySchema(rdd2, schema)
>
>
>
> r = df2.filter("dt >= '2015-05-06'")
>
> r.explain(*true*)
>
> r.show
>
> println("==")
>
>
>
> r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")
>
> r2.explain(*true*)
>
> r2.show
>
>
>


Spark Cassandra connector number of Tasks

2015-05-08 Thread vijaypawnarkar
I am using the Spark Cassandra connector to work with a table with 3 million
records. Using .where() API to work with only a certain rows in this table.
Where clause filters the data to 1 rows.

CassandraJavaUtil.javaFunctions(sparkContext) .cassandraTable(KEY_SPACE,
MY_TABLE, CassandraJavaUtil.mapRowTo(MyClass.class)).where(cqlDataFilter,
cqlFilterParams) 


Also using parameter spark.cassandra.input.split.size=1000 

As this job is processed by Spark cluster, it created 3000 partitions
instead of 10. On spark cluster 3000 tasks are being executed. As the data
in our table grows to 30 million rows, this will create 30,000 tasks instead
of 10. 

Is there a better way to approach process these 10,000 records with 10
tasks.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-number-of-Tasks-tp22820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread Michael Armbrust
Thats a feature flag for a new code path for reading parquet files.  Its
only there in case bugs are found in the old path and will be removed once
we are sure the new path is solid.

On Fri, May 8, 2015 at 8:04 AM, Peter Rudenko 
wrote:

>  Hm, thanks.
> Do you know what this setting mean:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L1178
> ?
>
> Thanks,
> Peter Rudenko
>
>
> On 2015-05-08 17:48, ayan guha wrote:
>
> From S3. As the dependency of df will be on s3. And because rdds are not
> replicated.
> On 8 May 2015 23:02, "Peter Rudenko" < 
> petro.rude...@gmail.com> wrote:
>
>>  Hi, i have a next question:
>>
>> val data = sc.textFile("s3:///")val df = data.toDF
>> df.saveAsParquetFile("hdfs://")
>> df.someAction(...)
>>
>> if during someAction some workers would die, would recomputation download
>> files from s3 or from hdfs parquet?
>>
>> Thanks,
>> Peter Rudenko
>> ​
>>
>
>


Re: Map one RDD into two RDD

2015-05-08 Thread anshu shukla
Any update to above mail
and  Can anyone tell me logic - I have to filter tweets and submit tweets
 with particular  #hashtag1 to SparkSQL  databases and tweets with
 #hashtag2  will be passed to sentiment analysis phase ."Problem is how to
split the input data in two streams using hashtags "

On Fri, May 8, 2015 at 2:42 AM, anshu shukla  wrote:

> One of  the best discussion in mailing list  :-)  ...Please  help me in
> concluding --
>
> The whole discussion concludes that -
>
> 1-  Framework  does not support  increasing parallelism of any task just
> by any inbuilt function .
> 2-  User have to manualy write logic for filter output of upstream node in
> DAG  to manage input to Downstream nodes (like shuffle grouping etc in
> STORM)
> 3- If we want to increase the level of parallelism of twitter streaming
>  Spout  to *get higher rate of  DStream of tweets  (to increase the rate
> of input )  , how it is possible ...  *
>
>   *val tweetStream = **TwitterUtils.createStream(ssc, Utils.getAuth)*
>
>
>
> On Fri, May 8, 2015 at 2:16 AM, Evo Eftimov  wrote:
>
>> 1. Will rdd2.filter run before rdd1.filter finish?
>>
>>
>>
>> YES
>>
>>
>>
>> 2. We have to traverse rdd twice. Any comments?
>>
>>
>>
>> You can invoke filter or whatever other transformation / function many
>> times
>>
>> Ps: you  have to study / learn the Parallel Programming Model of an OO
>> Framework like Spark – in any OO Framework lots of Behavior is hidden /
>> encapsulated by the Framework and the client code gets invoked at specific
>> points in the Flow of Control / Data based on callback functions
>>
>>
>>
>> That’s why stuff like RDD.filter(), RDD.filter() may look “sequential” to
>> you but it is not
>>
>>
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Thursday, May 7, 2015 6:27 PM
>>
>> *To:* Evo Eftimov
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Map one RDD into two RDD
>>
>>
>>
>> The multi-threading code in Scala is quite simple and you can google it
>> pretty easily. We used the Future framework. You can use Akka also.
>>
>>
>>
>> @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
>> before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?
>>
>>
>>
>> On Thursday, May 7, 2015, Evo Eftimov  wrote:
>>
>> Scala is a language, Spark is an OO/Functional, Distributed Framework
>> facilitating Parallel Programming in a distributed environment
>>
>>
>>
>> Any “Scala parallelism” occurs within the Parallel Model imposed by the
>> Spark OO Framework – ie it is limited in terms of what it can achieve in
>> terms of influencing the Spark Framework behavior – that is the nature of
>> programming with/for frameworks
>>
>>
>>
>> When RDD1 and RDD2 are partitioned and different Actions applied to them
>> this will result in Parallel Pipelines / DAGs within the Spark Framework
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Thursday, May 7, 2015 4:55 PM
>> *To:* Evo Eftimov
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Map one RDD into two RDD
>>
>>
>>
>> Thanks for the replies. We decided to use concurrency in Scala to do the
>> two mappings using the same source RDD in parallel. So far, it seems to be
>> working. Any comments?
>>
>> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com ]
>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Map one RDD into two RDD
>>
>>
>>
>> Hi all,
>>
>> I have a large RDD that I map a function to it. Based on the nature of
>> each record in the input RDD, I will generate two types of data. I would
>> like to save each type into its own RDD. But I can't seem to find an
>> efficient way to do it. Any suggestions?
>>
>>
>>
>> Many thanks.
>>
>>
>>
>>
>>
>> Bill
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>



-- 
Thanks & Regards,
Anshu Shukla


Re: dependencies on java-netlib and jblas

2015-05-08 Thread Sean Owen
Yes, at this point I believe you'll find jblas used for historical reasons,
to not change some APIs. I don't believe it's used for much if any
computation in 1.4.
On May 8, 2015 5:04 PM, "John Niekrasz"  wrote:

> Newbie question...
>
> Can I use any of the main ML capabilities of MLlib in a Java-only
> environment, without any native library dependencies?
>
> According to the documentation, java-netlib provides a JVM fallback. This
> suggests that native netlib libraries are not required.
>
> It appears that such a fallback is not available for jblas. However, a
> quick
> look at the MLlib source suggests that MLlib's dependencies on jblas are
> rather isolated:
>
> > grep -R jblas
> main/scala/org/apache/spark/ml/recommendation/ALS.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/optimization/NNLS.scala:import
> org.jblas.{DoubleMatrix, SimpleBlas}
>
> main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala:
> org.jblas.util.Random.seed(42)
> main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala:import
> org.jblas.DoubleMatrix
> main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala:import
> org.jblas.DoubleMatrix
>
> Is it true or false that many of MLlib's capabilities will work perfectly
> fine without any native (non-Java) libraries installed at all?
>
> Thanks for the help,
> John
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/dependencies-on-java-netlib-and-jblas-tp22818.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-08 Thread Michal Haris
+dev
On 6 May 2015 10:45, "Michal Haris"  wrote:

> Just wanted to check if somebody has seen similar behaviour or knows what
> we might be doing wrong. We have a relatively complex spark application
> which processes half a terabyte of data at various stages. We have profiled
> it in several ways and everything seems to point to one place where 90% of
> the time is spent:  AppendOnlyMap.changeValue. The job scales and is
> relatively faster than its map-reduce alternative but it still feels slower
> than it should be. I am suspecting too much spill but I haven't seen any
> improvement by increasing number of partitions to 10k. Any idea would be
> appreciated.
>
> --
> Michal Haris
> Technical Architect
> direct line: +44 (0) 207 749 0229
> www.visualdna.com | t: +44 (0) 207 734 7033,
>


Lambda architecture using Apache Spark

2015-05-08 Thread rafac
I am implementing the lambda architecture using apache spark for both
streaming and batch processing.
For real time queries i´m using spark streaming with cassandra and for batch
queries i am using spark sql and spark mlib.
The problem i ´m facing now is: i need to implemente one serving layer, i.e,
i need a database capabel of randow read for storing my pre-computed batch
views. The ones i was considering to use (druid/splout sql) don't have
native connectors.
Is it possibel integrate druid or splout using spark?
Any other suggestion?
Tanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lambda-architecture-using-Apache-Spark-tp22822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hi All,

I am submitting the assembled fat jar file by the command:

bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class
com.xxx.Consumer -0.1-SNAPSHOT.jar

It reads the data file from kinesis using the stream name defined in a
configuration file. It turns out that it reads the data perfectly fine for
one stream name (i.e. the default), however, if I switch the stream name
and re-submit the jar, it no longer reads the data. From CloudWatch, I can
see that there is data put into the stream and spark is actually sending
get requests as well. However, it doesn't seem to be outputting the data.

Has anyone else encountered a similar issue? Does spark cache the stream
name somewhere? I also have checkpointing enabled as well.

Thanks, Mike.


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-08 Thread Josh Rosen
Do you have any more specific profiling data that you can share?  I'm
curious to know where AppendOnlyMap.changeValue is being called from.

On Fri, May 8, 2015 at 1:26 PM, Michal Haris 
wrote:

> +dev
> On 6 May 2015 10:45, "Michal Haris"  wrote:
>
> > Just wanted to check if somebody has seen similar behaviour or knows what
> > we might be doing wrong. We have a relatively complex spark application
> > which processes half a terabyte of data at various stages. We have
> profiled
> > it in several ways and everything seems to point to one place where 90%
> of
> > the time is spent:  AppendOnlyMap.changeValue. The job scales and is
> > relatively faster than its map-reduce alternative but it still feels
> slower
> > than it should be. I am suspecting too much spill but I haven't seen any
> > improvement by increasing number of partitions to 10k. Any idea would be
> > appreciated.
> >
> > --
> > Michal Haris
> > Technical Architect
> > direct line: +44 (0) 207 749 0229
> > www.visualdna.com | t: +44 (0) 207 734 7033,
> >
>


RE: Lambda architecture using Apache Spark

2015-05-08 Thread Mohammed Guller
Why are you not using Cassandra for storing the pre-computed views?

Mohammed


-Original Message-
From: rafac [mailto:rafaelme...@hotmail.com] 
Sent: Friday, May 8, 2015 1:48 PM
To: user@spark.apache.org
Subject: Lambda architecture using Apache Spark

I am implementing the lambda architecture using apache spark for both streaming 
and batch processing.
For real time queries i´m using spark streaming with cassandra and for batch 
queries i am using spark sql and spark mlib.
The problem i ´m facing now is: i need to implemente one serving layer, i.e, i 
need a database capabel of randow read for storing my pre-computed batch views. 
The ones i was considering to use (druid/splout sql) don't have native 
connectors.
Is it possibel integrate druid or splout using spark?
Any other suggestion?
Tanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Lambda-architecture-using-Apache-Spark-tp22822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Spark streaming updating a large window more frequently

2015-05-08 Thread Ankur Chauhan
Hi,

I am pretty new to spark/spark_streaming so please excuse my naivety. I have 
streaming event stream that is timestamped and I would like to aggregate it 
into, let's say, hourly buckets. Now the simple answer is to use a window 
operation with window length of 1 hr and sliding interval of 1hr. But this sort 
of doesn't exactly work:

1. The time boundaries aren't exactly perfect. i.e. the process/stream 
aggreagation may get started at the middle of the hour so the 1st hour may 
actually be less than 1 hour long and then subsequent hours should be aligned 
to the next hour.
2. The If I understand this correctly, the above method would mean that all my 
data is "collected" for 1 hour and then summarised. Though correct, how do I 
get the aggregations to occur more frequently than that. Something like 
"aggregate these events into hourly buckets updating it every 5 seconds".

I would really appreciate pointers to code samples or some blogs that could 
help me identify best practices.

-- Ankur Chauhan


signature.asc
Description: Message signed with OpenPGP using GPGMail


RE: Spark streaming updating a large window more frequently

2015-05-08 Thread Mohammed Guller
If I understand you correctly, you need Window duration of 1 hour and sliding 
interval of 5 seconds. 

Mohammed


-Original Message-
From: Ankur Chauhan [mailto:achau...@brightcove.com] 
Sent: Friday, May 8, 2015 2:27 PM
To: u...@spark.incubator.apache.org
Subject: Spark streaming updating a large window more frequently

Hi,

I am pretty new to spark/spark_streaming so please excuse my naivety. I have 
streaming event stream that is timestamped and I would like to aggregate it 
into, let's say, hourly buckets. Now the simple answer is to use a window 
operation with window length of 1 hr and sliding interval of 1hr. But this sort 
of doesn't exactly work:

1. The time boundaries aren't exactly perfect. i.e. the process/stream 
aggreagation may get started at the middle of the hour so the 1st hour may 
actually be less than 1 hour long and then subsequent hours should be aligned 
to the next hour.
2. The If I understand this correctly, the above method would mean that all my 
data is "collected" for 1 hour and then summarised. Though correct, how do I 
get the aggregations to occur more frequently than that. Something like 
"aggregate these events into hourly buckets updating it every 5 seconds".

I would really appreciate pointers to code samples or some blogs that could 
help me identify best practices.

-- Ankur Chauhan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



CREATE TABLE ignores database when using PARQUET option

2015-05-08 Thread Carlos Pereira
Hi, I would like to create a hive table on top a existent parquet file as
described here:
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html

Due network restrictions, I need to store the metadata definition in a
different path than '/user/hive/warehouse', so I first set a new database on
my own HDFS dir:

CREATE DATABASE foo_db LOCATION '/user/foo';
USE foo_db;

And then I run the following query:

CREATE TABLE mytable_parquet
USING parquet 
OPTIONS (path "/user/foo/data.parquet")

The problem is that SparkSQL is not using the same database defined the in
shell context, but the default metastore instead of:


 > CREATE TABLE mytable_parquet USING parquet OPTIONS (path
"/user/foo/data.parquet");
15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: get_table : *db=foo_db*
tbl=mytable_parquet

15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
cmd=get_table : db=foo_db tbl=mytable_parquet

15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: create_table:
Table(tableName:mytable_parquet, *dbName:default,* owner:foo,
createTime:1431117741, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=/user/foo/data.parquet}),
bucketCols:[], sortCols:[], parameters:{},
skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
cmd=create_table: Table(tableName:mytable_parquet, dbName:default,
owner:foo, createTime:1431117741, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=/user/foo/data.parquet}),
bucketCols:[], sortCols:[], parameters:{},
skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)

15/05/08 20:42:21 ERROR hive.log: Got exception:
org.apache.hadoop.security.AccessControlException Permission denied:
user=foo, access=WRITE,
inode="/user/hive/warehouse":hive:grp_gdoop_hdfs:drwxr-xr-x



The permission error above happens because my linux user does not have write
access on the default metastore path. I can workaround this issue if I use
CREATE TEMPORARY TABLE and have no metadata written on disk.

I would like to know if I am doing anything wrong here and if there is any
additional property I can use to force the database/metastore_dir I need to
write on.

Thanks,
Carlos




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CREATE-TABLE-ignores-database-when-using-PARQUET-option-tp22824.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CREATE TABLE ignores database when using PARQUET option

2015-05-08 Thread Michael Armbrust
This is an unfortunate limitation of the datasource api which does not
support multiple databases.  For parquet in particular (if you aren't using
schema merging).  You can create a hive table using STORED AS PARQUET
today.  I hope to fix this limitation in Spark 1.5.

On Fri, May 8, 2015 at 2:41 PM, Carlos Pereira  wrote:

> Hi, I would like to create a hive table on top a existent parquet file as
> described here:
>
> https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html
>
> Due network restrictions, I need to store the metadata definition in a
> different path than '/user/hive/warehouse', so I first set a new database
> on
> my own HDFS dir:
>
> CREATE DATABASE foo_db LOCATION '/user/foo';
> USE foo_db;
>
> And then I run the following query:
>
> CREATE TABLE mytable_parquet
> USING parquet
> OPTIONS (path "/user/foo/data.parquet")
>
> The problem is that SparkSQL is not using the same database defined the in
> shell context, but the default metastore instead of:
>
> 
>  > CREATE TABLE mytable_parquet USING parquet OPTIONS (path
> "/user/foo/data.parquet");
> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: get_table : *db=foo_db*
> tbl=mytable_parquet
>
> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
> cmd=get_table : db=foo_db tbl=mytable_parquet
>
> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: create_table:
> Table(tableName:mytable_parquet, *dbName:default,* owner:foo,
> createTime:1431117741, lastAccessTime:0, retention:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
> comment:from deserializer)], location:null,
> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
> bucketCols:[], sortCols:[], parameters:{},
> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
> skewedColValueLocationMaps:{})), partitionKeys:[],
> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
> cmd=create_table: Table(tableName:mytable_parquet, dbName:default,
> owner:foo, createTime:1431117741, lastAccessTime:0, retention:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
> comment:from deserializer)], location:null,
> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
> bucketCols:[], sortCols:[], parameters:{},
> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
> skewedColValueLocationMaps:{})), partitionKeys:[],
> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
>
> 15/05/08 20:42:21 ERROR hive.log: Got exception:
> org.apache.hadoop.security.AccessControlException Permission denied:
> user=foo, access=WRITE,
> inode="/user/hive/warehouse":hive:grp_gdoop_hdfs:drwxr-xr-x
> 
>
>
> The permission error above happens because my linux user does not have
> write
> access on the default metastore path. I can workaround this issue if I use
> CREATE TEMPORARY TABLE and have no metadata written on disk.
>
> I would like to know if I am doing anything wrong here and if there is any
> additional property I can use to force the database/metastore_dir I need to
> write on.
>
> Thanks,
> Carlos
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/CREATE-TABLE-ignores-database-when-using-PARQUET-option-tp22824.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Hash Partitioning and Dataframes

2015-05-08 Thread Daniel, Ronald (ELS-SDG)
Hi,

How can I ensure that a batch of DataFrames I make are all partitioned based on 
the value of one column common to them all?
For RDDs I would partitionBy a HashPartitioner, but I don't see that in the 
DataFrame API.
If I partition the RDDs that way, then do a toDF(), will the partitioning be 
preserved?

Thanks,
Ron



Re: CREATE TABLE ignores database when using PARQUET option

2015-05-08 Thread Carlos Pereira
Thanks Michael for the quick return. I was looking forward the automatic
schema inferring (I think that's you mean by 'schema merging' ?), and I
think the STORED AS would still require me to define the table columns
right?

Anyways, I am glad to hear you guys already working to fix this on future
releases.

Thanks,
Carlos

On Fri, May 8, 2015 at 2:43 PM, Michael Armbrust 
wrote:

> This is an unfortunate limitation of the datasource api which does not
> support multiple databases.  For parquet in particular (if you aren't using
> schema merging).  You can create a hive table using STORED AS PARQUET
> today.  I hope to fix this limitation in Spark 1.5.
>
> On Fri, May 8, 2015 at 2:41 PM, Carlos Pereira 
> wrote:
>
>> Hi, I would like to create a hive table on top a existent parquet file as
>> described here:
>>
>> https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html
>>
>> Due network restrictions, I need to store the metadata definition in a
>> different path than '/user/hive/warehouse', so I first set a new database
>> on
>> my own HDFS dir:
>>
>> CREATE DATABASE foo_db LOCATION '/user/foo';
>> USE foo_db;
>>
>> And then I run the following query:
>>
>> CREATE TABLE mytable_parquet
>> USING parquet
>> OPTIONS (path "/user/foo/data.parquet")
>>
>> The problem is that SparkSQL is not using the same database defined the in
>> shell context, but the default metastore instead of:
>>
>> 
>>  > CREATE TABLE mytable_parquet USING parquet OPTIONS (path
>> "/user/foo/data.parquet");
>> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: get_table : *db=foo_db*
>> tbl=mytable_parquet
>>
>> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
>> cmd=get_table : db=foo_db tbl=mytable_parquet
>>
>> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: create_table:
>> Table(tableName:mytable_parquet, *dbName:default,* owner:foo,
>> createTime:1431117741, lastAccessTime:0, retention:0,
>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>> comment:from deserializer)], location:null,
>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>>
>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
>> bucketCols:[], sortCols:[], parameters:{},
>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>> skewedColValueLocationMaps:{})), partitionKeys:[],
>> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
>> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
>> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo ip=unknown-ip-addr
>> cmd=create_table: Table(tableName:mytable_parquet, dbName:default,
>> owner:foo, createTime:1431117741, lastAccessTime:0, retention:0,
>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>> comment:from deserializer)], location:null,
>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>>
>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
>> bucketCols:[], sortCols:[], parameters:{},
>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>> skewedColValueLocationMaps:{})), partitionKeys:[],
>> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
>> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
>>
>> 15/05/08 20:42:21 ERROR hive.log: Got exception:
>> org.apache.hadoop.security.AccessControlException Permission denied:
>> user=foo, access=WRITE,
>> inode="/user/hive/warehouse":hive:grp_gdoop_hdfs:drwxr-xr-x
>> 
>>
>>
>> The permission error above happens because my linux user does not have
>> write
>> access on the default metastore path. I can workaround this issue if I use
>> CREATE TEMPORARY TABLE and have no metadata written on disk.
>>
>> I would like to know if I am doing anything wrong here and if there is any
>> additional property I can use to force the database/metastore_dir I need
>> to
>> write on.
>>
>> Thanks,
>> Carlos
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/CREATE-TABLE-ignores-database-when-using-PARQUET-option-tp22824.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Hash Partitioning and Dataframes

2015-05-08 Thread Michael Armbrust
What are you trying to accomplish?  Internally Spark SQL will add Exchange
operators to make sure that data is partitioned correctly for joins and
aggregations.  If you are going to do other RDD operations on the result of
dataframe operations and you need to manually control the partitioning,
call df.rdd and partition as you normally would.

On Fri, May 8, 2015 at 2:47 PM, Daniel, Ronald (ELS-SDG) <
r.dan...@elsevier.com> wrote:

> Hi,
>
> How can I ensure that a batch of DataFrames I make are all partitioned
> based on the value of one column common to them all?
> For RDDs I would partitionBy a HashPartitioner, but I don't see that in
> the DataFrame API.
> If I partition the RDDs that way, then do a toDF(), will the partitioning
> be preserved?
>
> Thanks,
> Ron
>
>


Re: CREATE TABLE ignores database when using PARQUET option

2015-05-08 Thread Michael Armbrust
Actually, I was talking about the support for inferring different but
compatible schemata from various files, automatically merging them into a
single schema.  However, you are right that I think you need to specify the
columns / types if you create it as a Hive table.

On Fri, May 8, 2015 at 3:11 PM, Carlos Pereira  wrote:

> Thanks Michael for the quick return. I was looking forward the automatic
> schema inferring (I think that's you mean by 'schema merging' ?), and I
> think the STORED AS would still require me to define the table columns
> right?
>
> Anyways, I am glad to hear you guys already working to fix this on future
> releases.
>
> Thanks,
> Carlos
>
> On Fri, May 8, 2015 at 2:43 PM, Michael Armbrust 
> wrote:
>
>> This is an unfortunate limitation of the datasource api which does not
>> support multiple databases.  For parquet in particular (if you aren't using
>> schema merging).  You can create a hive table using STORED AS PARQUET
>> today.  I hope to fix this limitation in Spark 1.5.
>>
>> On Fri, May 8, 2015 at 2:41 PM, Carlos Pereira 
>> wrote:
>>
>>> Hi, I would like to create a hive table on top a existent parquet file as
>>> described here:
>>>
>>> https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html
>>>
>>> Due network restrictions, I need to store the metadata definition in a
>>> different path than '/user/hive/warehouse', so I first set a new
>>> database on
>>> my own HDFS dir:
>>>
>>> CREATE DATABASE foo_db LOCATION '/user/foo';
>>> USE foo_db;
>>>
>>> And then I run the following query:
>>>
>>> CREATE TABLE mytable_parquet
>>> USING parquet
>>> OPTIONS (path "/user/foo/data.parquet")
>>>
>>> The problem is that SparkSQL is not using the same database defined the
>>> in
>>> shell context, but the default metastore instead of:
>>>
>>> 
>>>  > CREATE TABLE mytable_parquet USING parquet OPTIONS (path
>>> "/user/foo/data.parquet");
>>> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: get_table :
>>> *db=foo_db*
>>> tbl=mytable_parquet
>>>
>>> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo
>>>  ip=unknown-ip-addr
>>> cmd=get_table : db=foo_db tbl=mytable_parquet
>>>
>>> 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: create_table:
>>> Table(tableName:mytable_parquet, *dbName:default,* owner:foo,
>>> createTime:1431117741, lastAccessTime:0, retention:0,
>>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>>> comment:from deserializer)], location:null,
>>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>>>
>>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>>> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
>>> bucketCols:[], sortCols:[], parameters:{},
>>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>>> skewedColValueLocationMaps:{})), partitionKeys:[],
>>> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
>>> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
>>> 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo
>>>  ip=unknown-ip-addr
>>> cmd=create_table: Table(tableName:mytable_parquet, dbName:default,
>>> owner:foo, createTime:1431117741, lastAccessTime:0, retention:0,
>>> sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
>>> comment:from deserializer)], location:null,
>>> inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
>>> outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
>>> compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
>>>
>>> serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
>>> parameters:{serialization.format=1, path=/user/foo/data.parquet}),
>>> bucketCols:[], sortCols:[], parameters:{},
>>> skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
>>> skewedColValueLocationMaps:{})), partitionKeys:[],
>>> parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
>>> viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
>>>
>>> 15/05/08 20:42:21 ERROR hive.log: Got exception:
>>> org.apache.hadoop.security.AccessControlException Permission denied:
>>> user=foo, access=WRITE,
>>> inode="/user/hive/warehouse":hive:grp_gdoop_hdfs:drwxr-xr-x
>>> 
>>>
>>>
>>> The permission error above happens because my linux user does not have
>>> write
>>> access on the default metastore path. I can workaround this issue if I
>>> use
>>> CREATE TEMPORARY TABLE and have no metadata written on disk.
>>>
>>> I would like to know if I am doing anything wrong here and if there is
>>> any
>>> additional property I can use to force the database/metastore_dir I need
>>> to
>>> write on.
>>>
>>> Thanks,
>>> Carlos
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabb

RE: Hash Partitioning and Dataframes

2015-05-08 Thread Daniel, Ronald (ELS-SDG)
Just trying to make sure that something I know in advance (the joins will 
always have an equality test on one specific field) is used to optimize the 
partitioning so the joins only use local data.

Thanks for the info.

Ron


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Friday, May 08, 2015 3:15 PM
To: Daniel, Ronald (ELS-SDG)
Cc: user@spark.apache.org
Subject: Re: Hash Partitioning and Dataframes

What are you trying to accomplish?  Internally Spark SQL will add Exchange 
operators to make sure that data is partitioned correctly for joins and 
aggregations.  If you are going to do other RDD operations on the result of 
dataframe operations and you need to manually control the partitioning, call 
df.rdd and partition as you normally would.

On Fri, May 8, 2015 at 2:47 PM, Daniel, Ronald (ELS-SDG) 
mailto:r.dan...@elsevier.com>> wrote:
Hi,

How can I ensure that a batch of DataFrames I make are all partitioned based on 
the value of one column common to them all?
For RDDs I would partitionBy a HashPartitioner, but I don't see that in the 
DataFrame API.
If I partition the RDDs that way, then do a toDF(), will the partitioning be 
preserved?

Thanks,
Ron



Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
   - [Kinesis stream name]: The Kinesis stream that this streaming
   application receives from
  - The application name used in the streaming context becomes the
  Kinesis application name
  - The application name must be unique for a given account and region.
  - The Kinesis backend automatically associates the application name
  to the Kinesis stream using a DynamoDB table (always in the us-east-1
  region) created during Kinesis Client Library initialization.
  - *Changing the application name or stream name can lead to Kinesis
  errors in some cases. If you see errors, you may need to manually delete
  the DynamoDB table.*


On Fri, May 8, 2015 at 2:06 PM, Mike Trienis 
wrote:

> Hi All,
>
> I am submitting the assembled fat jar file by the command:
>
> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
> --class com.xxx.Consumer -0.1-SNAPSHOT.jar
>
> It reads the data file from kinesis using the stream name defined in a
> configuration file. It turns out that it reads the data perfectly fine for
> one stream name (i.e. the default), however, if I switch the stream name
> and re-submit the jar, it no longer reads the data. From CloudWatch, I can
> see that there is data put into the stream and spark is actually sending
> get requests as well. However, it doesn't seem to be outputting the data.
>
> Has anyone else encountered a similar issue? Does spark cache the stream
> name somewhere? I also have checkpointing enabled as well.
>
> Thanks, Mike.
>
>
>
>
>
>


Spark SQL: STDDEV working in Spark Shell but not in a standalone app

2015-05-08 Thread barmaley
Given a registered table from data frame, I'm able to execute queries like
sqlContext.sql("SELECT STDDEV(col1) FROM table") from Spark Shell just fine.
However, when I run exactly the same code in a standalone app on a cluster,
it throws an exception: "java.util.NoSuchElementException: key not found:
STDDEV"...

Is STDDEV ia among default functions in Spark SQL? I'd appreciate if you
could comment what's going on with the above.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-STDDEV-working-in-Spark-Shell-but-not-in-a-standalone-app-tp22825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL and Hive interoperability

2015-05-08 Thread jdavidmitchell
So, why isn't this comment/question being posted to the list?  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-and-Hive-interoperability-tp22690p22827.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL: STDDEV working in Spark Shell but not in a standalone app

2015-05-08 Thread Yin Huai
Can you attach the full stack trace?

Thanks,

Yin

On Fri, May 8, 2015 at 4:44 PM, barmaley  wrote:

> Given a registered table from data frame, I'm able to execute queries like
> sqlContext.sql("SELECT STDDEV(col1) FROM table") from Spark Shell just
> fine.
> However, when I run exactly the same code in a standalone app on a cluster,
> it throws an exception: "java.util.NoSuchElementException: key not found:
> STDDEV"...
>
> Is STDDEV ia among default functions in Spark SQL? I'd appreciate if you
> could comment what's going on with the above.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-STDDEV-working-in-Spark-Shell-but-not-in-a-standalone-app-tp22825.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Chris Fregly
hey mike-

as you pointed out here from my docs, changing the stream name is sometimes 
problematic due to the way the Kinesis Client Library manages leases and 
checkpoints, etc in DynamoDB.

I noticed this directly while developing the Kinesis connector which is why I 
highlighted the issue here.

is wiping out the DynamoDB table a suitable workaround for now?  usually in 
production, you wouldn't be changing stream names often, so hopefully that's ok 
for dev.

otherwise, can you share the relevant spark streaming logs that are generated 
when you do this?

I saw a lot of "lease not owned by this Kinesis Client" type of errors, from 
what I remember.

lemme know!

-Chris 

> On May 8, 2015, at 4:36 PM, Mike Trienis  wrote:
> 
> [Kinesis stream name]: The Kinesis stream that this streaming application 
> receives from
> The application name used in the streaming context becomes the Kinesis 
> application name
> The application name must be unique for a given account and region.
> The Kinesis backend automatically associates the application name to the 
> Kinesis stream using a DynamoDB table (always in the us-east-1 region) 
> created during Kinesis Client Library initialization.
> Changing the application name or stream name can lead to Kinesis errors in 
> some cases. If you see errors, you may need to manually delete the DynamoDB 
> table.
> 
>> On Fri, May 8, 2015 at 2:06 PM, Mike Trienis  wrote:
>> Hi All,
>> 
>> I am submitting the assembled fat jar file by the command:
>> 
>> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class 
>> com.xxx.Consumer -0.1-SNAPSHOT.jar 
>> 
>> It reads the data file from kinesis using the stream name defined in a 
>> configuration file. It turns out that it reads the data perfectly fine for 
>> one stream name (i.e. the default), however, if I switch the stream name and 
>> re-submit the jar, it no longer reads the data. From CloudWatch, I can see 
>> that there is data put into the stream and spark is actually sending get 
>> requests as well. However, it doesn't seem to be outputting the data. 
>> 
>> Has anyone else encountered a similar issue? Does spark cache the stream 
>> name somewhere? I also have checkpointing enabled as well.
>> 
>> Thanks, Mike. 
> 


Re: Map one RDD into two RDD

2015-05-08 Thread ayan guha
Do as Evo suggested. Rdd1=rdd.filter, rdd2=rdd.filter
On 9 May 2015 05:19, "anshu shukla"  wrote:

> Any update to above mail
> and  Can anyone tell me logic - I have to filter tweets and submit tweets
>  with particular  #hashtag1 to SparkSQL  databases and tweets with
>  #hashtag2  will be passed to sentiment analysis phase ."Problem is how to
> split the input data in two streams using hashtags "
>
> On Fri, May 8, 2015 at 2:42 AM, anshu shukla 
> wrote:
>
>> One of  the best discussion in mailing list  :-)  ...Please  help me in
>> concluding --
>>
>> The whole discussion concludes that -
>>
>> 1-  Framework  does not support  increasing parallelism of any task just
>> by any inbuilt function .
>> 2-  User have to manualy write logic for filter output of upstream node
>> in DAG  to manage input to Downstream nodes (like shuffle grouping etc in
>> STORM)
>> 3- If we want to increase the level of parallelism of twitter streaming
>>  Spout  to *get higher rate of  DStream of tweets  (to increase the rate
>> of input )  , how it is possible ...  *
>>
>>   *val tweetStream = **TwitterUtils.createStream(ssc, Utils.getAuth)*
>>
>>
>>
>> On Fri, May 8, 2015 at 2:16 AM, Evo Eftimov 
>> wrote:
>>
>>> 1. Will rdd2.filter run before rdd1.filter finish?
>>>
>>>
>>>
>>> YES
>>>
>>>
>>>
>>> 2. We have to traverse rdd twice. Any comments?
>>>
>>>
>>>
>>> You can invoke filter or whatever other transformation / function many
>>> times
>>>
>>> Ps: you  have to study / learn the Parallel Programming Model of an OO
>>> Framework like Spark – in any OO Framework lots of Behavior is hidden /
>>> encapsulated by the Framework and the client code gets invoked at specific
>>> points in the Flow of Control / Data based on callback functions
>>>
>>>
>>>
>>> That’s why stuff like RDD.filter(), RDD.filter() may look “sequential”
>>> to you but it is not
>>>
>>>
>>>
>>>
>>>
>>> *From:* Bill Q [mailto:bill.q@gmail.com]
>>> *Sent:* Thursday, May 7, 2015 6:27 PM
>>>
>>> *To:* Evo Eftimov
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: Map one RDD into two RDD
>>>
>>>
>>>
>>> The multi-threading code in Scala is quite simple and you can google it
>>> pretty easily. We used the Future framework. You can use Akka also.
>>>
>>>
>>>
>>> @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
>>> before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?
>>>
>>>
>>>
>>> On Thursday, May 7, 2015, Evo Eftimov  wrote:
>>>
>>> Scala is a language, Spark is an OO/Functional, Distributed Framework
>>> facilitating Parallel Programming in a distributed environment
>>>
>>>
>>>
>>> Any “Scala parallelism” occurs within the Parallel Model imposed by the
>>> Spark OO Framework – ie it is limited in terms of what it can achieve in
>>> terms of influencing the Spark Framework behavior – that is the nature of
>>> programming with/for frameworks
>>>
>>>
>>>
>>> When RDD1 and RDD2 are partitioned and different Actions applied to them
>>> this will result in Parallel Pipelines / DAGs within the Spark Framework
>>>
>>> RDD1 = RDD.filter()
>>>
>>> RDD2 = RDD.filter()
>>>
>>>
>>>
>>>
>>>
>>> *From:* Bill Q [mailto:bill.q@gmail.com]
>>> *Sent:* Thursday, May 7, 2015 4:55 PM
>>> *To:* Evo Eftimov
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: Map one RDD into two RDD
>>>
>>>
>>>
>>> Thanks for the replies. We decided to use concurrency in Scala to do the
>>> two mappings using the same source RDD in parallel. So far, it seems to be
>>> working. Any comments?
>>>
>>> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>>>
>>> RDD1 = RDD.filter()
>>>
>>> RDD2 = RDD.filter()
>>>
>>>
>>>
>>> *From:* Bill Q [mailto:bill.q@gmail.com ]
>>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* Map one RDD into two RDD
>>>
>>>
>>>
>>> Hi all,
>>>
>>> I have a large RDD that I map a function to it. Based on the nature of
>>> each record in the input RDD, I will generate two types of data. I would
>>> like to save each type into its own RDD. But I can't seem to find an
>>> efficient way to do it. Any suggestions?
>>>
>>>
>>>
>>> Many thanks.
>>>
>>>
>>>
>>>
>>>
>>> Bill
>>>
>>>
>>>
>>> --
>>>
>>> Many thanks.
>>>
>>> Bill
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Many thanks.
>>>
>>> Bill
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Many thanks.
>>>
>>> Bill
>>>
>>>
>>>
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: CREATE TABLE ignores database when using PARQUET option

2015-05-08 Thread ayan guha
I am just wondering if create table supports the syntax of
Create table dB.tablename
Instead of two step process of use dB and then create table tablename?
On 9 May 2015 08:17, "Michael Armbrust"  wrote:

> Actually, I was talking about the support for inferring different but
> compatible schemata from various files, automatically merging them into a
> single schema.  However, you are right that I think you need to specify the
> columns / types if you create it as a Hive table.
>
> On Fri, May 8, 2015 at 3:11 PM, Carlos Pereira 
> wrote:
>
>> Thanks Michael for the quick return. I was looking forward the automatic
>> schema inferring (I think that's you mean by 'schema merging' ?), and I
>> think the STORED AS would still require me to define the table columns
>> right?
>>
>> Anyways, I am glad to hear you guys already working to fix this on future
>> releases.
>>
>> Thanks,
>> Carlos
>>
>> On Fri, May 8, 2015 at 2:43 PM, Michael Armbrust 
>> wrote:
>>
>>> This is an unfortunate limitation of the datasource api which does not
>>> support multiple databases.  For parquet in particular (if you aren't using
>>> schema merging).  You can create a hive table using STORED AS PARQUET
>>> today.  I hope to fix this limitation in Spark 1.5.
>>>
>>> On Fri, May 8, 2015 at 2:41 PM, Carlos Pereira 
>>> wrote:
>>>
 Hi, I would like to create a hive table on top a existent parquet file
 as
 described here:

 https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html

 Due network restrictions, I need to store the metadata definition in a
 different path than '/user/hive/warehouse', so I first set a new
 database on
 my own HDFS dir:

 CREATE DATABASE foo_db LOCATION '/user/foo';
 USE foo_db;

 And then I run the following query:

 CREATE TABLE mytable_parquet
 USING parquet
 OPTIONS (path "/user/foo/data.parquet")

 The problem is that SparkSQL is not using the same database defined the
 in
 shell context, but the default metastore instead of:

 
  > CREATE TABLE mytable_parquet USING parquet OPTIONS (path
 "/user/foo/data.parquet");
 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: get_table :
 *db=foo_db*
 tbl=mytable_parquet

 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo
  ip=unknown-ip-addr
 cmd=get_table : db=foo_db tbl=mytable_parquet

 15/05/08 20:42:21 INFO metastore.HiveMetaStore: 0: create_table:
 Table(tableName:mytable_parquet, *dbName:default,* owner:foo,
 createTime:1431117741, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,

 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=/user/foo/data.parquet}),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
 15/05/08 20:42:21 INFO HiveMetaStore.audit: ugi=foo
  ip=unknown-ip-addr
 cmd=create_table: Table(tableName:mytable_parquet, dbName:default,
 owner:foo, createTime:1431117741, lastAccessTime:0, retention:0,
 sd:StorageDescriptor(cols:[FieldSchema(name:col, type:array,
 comment:from deserializer)], location:null,
 inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
 outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
 compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,

 serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
 parameters:{serialization.format=1, path=/user/foo/data.parquet}),
 bucketCols:[], sortCols:[], parameters:{},
 skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
 skewedColValueLocationMaps:{})), partitionKeys:[],
 parameters:{EXTERNAL=TRUE, spark.sql.sources.provider=parquet},
 viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)

 15/05/08 20:42:21 ERROR hive.log: Got exception:
 org.apache.hadoop.security.AccessControlException Permission denied:
 user=foo, access=WRITE,
 inode="/user/hive/warehouse":hive:grp_gdoop_hdfs:drwxr-xr-x
 


 The permission error above happens because my linux user does not have
 write
 access on the default metastore path. I can workaround this issue if I
 use
 CREATE TEMPORARY TABLE and have no metadata written o

Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hey Chris!

I was happy to see the documentation outlining that issue :-) However, I
must have got into a pretty terrible state because I had to delete and
recreate the kinesis streams as well as the DynamoDB tables.

Thanks for the reply, everything is sorted.

Mike




On Fri, May 8, 2015 at 7:55 PM, Chris Fregly  wrote:

> hey mike-
>
> as you pointed out here from my docs, changing the stream name is
> sometimes problematic due to the way the Kinesis Client Library manages
> leases and checkpoints, etc in DynamoDB.
>
> I noticed this directly while developing the Kinesis connector which is
> why I highlighted the issue here.
>
> is wiping out the DynamoDB table a suitable workaround for now?  usually
> in production, you wouldn't be changing stream names often, so hopefully
> that's ok for dev.
>
> otherwise, can you share the relevant spark streaming logs that are
> generated when you do this?
>
> I saw a lot of "lease not owned by this Kinesis Client" type of errors,
> from what I remember.
>
> lemme know!
>
> -Chris
>
> On May 8, 2015, at 4:36 PM, Mike Trienis  wrote:
>
>
>- [Kinesis stream name]: The Kinesis stream that this streaming
>application receives from
>   - The application name used in the streaming context becomes the
>   Kinesis application name
>   - The application name must be unique for a given account and
>   region.
>   - The Kinesis backend automatically associates the application name
>   to the Kinesis stream using a DynamoDB table (always in the us-east-1
>   region) created during Kinesis Client Library initialization.
>   - *Changing the application name or stream name can lead to Kinesis
>   errors in some cases. If you see errors, you may need to manually delete
>   the DynamoDB table.*
>
>
> On Fri, May 8, 2015 at 2:06 PM, Mike Trienis 
> wrote:
>
>> Hi All,
>>
>> I am submitting the assembled fat jar file by the command:
>>
>> bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
>> --class com.xxx.Consumer -0.1-SNAPSHOT.jar
>>
>> It reads the data file from kinesis using the stream name defined in a
>> configuration file. It turns out that it reads the data perfectly fine for
>> one stream name (i.e. the default), however, if I switch the stream name
>> and re-submit the jar, it no longer reads the data. From CloudWatch, I can
>> see that there is data put into the stream and spark is actually sending
>> get requests as well. However, it doesn't seem to be outputting the data.
>>
>> Has anyone else encountered a similar issue? Does spark cache the stream
>> name somewhere? I also have checkpointing enabled as well.
>>
>> Thanks, Mike.
>>
>>
>>
>>
>>
>>
>


Using Pandas/Scikit Learning in Pyspark

2015-05-08 Thread Bin Wang
Hey there,

I have a CDH cluster where the default Python installed on those Redhat
Linux are Python2.6.

I am thinking about developing a Spark application using pyspark and I want
to be able to use Pandas and Scikit learn package. Anaconda Python
interpreter has the most funtionalities out of box, however, when I try to
use Anaconda Python2.7. The Spark job won't run properly and failed due to
the reason that the Python interpreter is not consistent across the
cluster.
Here are my questions:

(1) I took a quick look at the source code of pyspark, looks like in the
end, they are using spark-submit. Doesn't that mean all the work in the end
will be translated into scala code and distribute the workload to the whole
cluster? In that case, I should not worry about the Python interpreter
beyond the master node right?

(2) If the Spark job need consistent Python library to be installed on
every node. Should I install Anaconda Python on all of them? If so, what is
the modern way of managing the Python ecosystem on the cluster?

I am a big fan of Python so please guide me.

Best regards,

Bin


spark and binary files

2015-05-08 Thread tog
Hi

I havé an application that currently run using MR. It currently starts
extracting information from a proprietary binary file that is copied to
HDFS. The application starts by creating business objects from information
extracted from the binary files. Later those objects are used for further
processing using again MR jobs.

I am planning to move towards Spark and I clearly see that I could use
JavaRDD for parallel processing. however it is not yet
obvious what could be the process to generate this RDD from my binary file
in parallel.

Today I use parallelism based on the split assign to each of the map
elements of a job. Can I mimick such a thing using spark. All example I
have seen so far are using text files for which I guess the partitions are
based on a given number of contiguous lines.

Any help or pointer would be appreciated

Cheers
Guillaume


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net