Re: can distinct transform applied on DStream?

2015-03-21 Thread Akhil Das
What do you mean not distinct?

It does works for me:
[image: Inline image 1]

Code:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

val ssc = new StreamingContext(sc, Seconds(1))

val data =
ssc.textFileStream("/home/akhld/mobi/localcluster/spark-1/sigmoid/")
val dist = data.transform(_.distinct())


dist.print()

ssc.start()
ssc.awaitTermination()






Thanks
Best Regards

On Fri, Mar 20, 2015 at 11:07 PM, Darren Hoo  wrote:

> val aDstream = ...
>
> val distinctStream = aDstream.transform(_.distinct())
>
> but the elements in distinctStream  are not distinct.
>
> Did I use it wrong?
>


Re: How to do nested foreach with RDD

2015-03-21 Thread Reza Zadeh
You can do this with the 'cartesian' product method on RDD. For example:

val rdd1 = ...
val rdd2 = ...

val combinations = rdd1.cartesian(rdd2).filter{ case (a,b) => a < b }

Reza

On Sat, Mar 21, 2015 at 10:37 PM, Xi Shen  wrote:

> Hi,
>
> I have two big RDD, and I need to do some math against each pair of them.
> Traditionally, it is like a nested for-loop. But for RDD, it cause a nested
> RDD which is prohibited.
>
> Currently, I am collecting one of them, then do a nested for-loop, so to
> avoid nested RDD. But would like to know if there's spark-way to do this.
>
>
> Thanks,
> David
>
>


How to do nested foreach with RDD

2015-03-21 Thread Xi Shen
Hi,

I have two big RDD, and I need to do some math against each pair of them.
Traditionally, it is like a nested for-loop. But for RDD, it cause a nested
RDD which is prohibited.

Currently, I am collecting one of them, then do a nested for-loop, so to
avoid nested RDD. But would like to know if there's spark-way to do this.


Thanks,
David


Re: Model deployment help

2015-03-21 Thread Donald Szeto
Hi Shashidhar,

Our team at PredictionIO is trying to solve the production deployment of
model. We built a powered-by-Spark framework (also certified on Spark by
Databricks) that allows a user to build models with everything available
from the Spark API, persist the model automatically with versioning, and
deploy as a REST service using simple CLI commands.

Regarding model degeneration and updates, if having a half to couple
seconds downtime is acceptable, with PIO one could simply run "pio train"
and "pio deploy" periodically with a cronjob. To achieve virtually zero
downtime, a load balancer could be setup in front of 2 "pio deploy"
instances.

Porting your current algorithm / model generation to PredictionIO should
just be a copy-and-paste procedure. We would be very grateful for any
feedback that would improve the deployment process.

We do not support PMML at the moment, but definitely are interested in your
use case.

You may get started with the documentation (http://docs.prediction.io/).
You could also visit the engine template gallery (
https://templates.prediction.io/) for quick, ready-to-use examples.
Prediction is open source software under APL2 on
https://github.com/PredictionIO/PredictionIO.

Looking forward to hearing your feedback!


Best Regards,
Donald
ᐧ

On Sat, Mar 21, 2015 at 10:40 AM, Shashidhar Rao  wrote:

> Hi,
>
> Apologies for the generic question.
>
> As I am developing predictive models for the first time and soon model
> will be deployed in production very soon.
>
> Could somebody help me with the  model deployment in production , I have
> read quite a few on model deployment and have read some books on Database
> deployment .
>
> My queries relate to how  updates to model happen when current model
> degenerates without any downtime and how others are deploying in production
> servers and a few lines on adoption of PMML currently in production.
>
> Please provide me with some good links  or some forums  so that I can
> learn as most of the books do not cover it extensively except for 'Mahout
> in action' where it is explained in some detail and have also checked
> stackoverflow but have not got any relevant answers.
>
> What I understand:
> 1. Build model using current training set and test the model.
> 2. Deploy the model,put it in some location and load it and predict when
> request comes for scoring.
> 3. Model degenerates , now build new model with new data.(Here some
> confusion , whether the old data is discarded completely or it is done with
> purely new data or a mix)
> 4. Here I am stuck , how to update the model without any downtime, the
> transition period when old model and new model happens.
>
> My naive solution would be, build the new model , save it in a new
> location and update the new path in some properties file or update the
> location in database when the saving is done. Is this correct or some best
> practices are available.
> Database is unlikely in my case.
>
> Thanks in advance.
>
>
>
>


-- 
Donald Szeto
PredictionIO


Re: How to set Spark executor memory?

2015-03-21 Thread Ted Yu
bq. the BLAS native cannot be loaded

Have you tried specifying --driver-library-path option ?

Cheers

On Sat, Mar 21, 2015 at 4:42 PM, Xi Shen  wrote:

> Yeah, I think it is harder to troubleshot the properties issues in a IDE.
> But the reason I stick to IDE is because if I use spark-submit, the BLAS
> native cannot be loaded. May be I should open another thread to discuss
> that.
>
> Thanks,
> David
>
> On Sun, 22 Mar 2015 10:38 Xi Shen  wrote:
>
>> In the log, I saw
>>
>>   MemoryStorage: MemoryStore started with capacity 6.7GB
>>
>> But I still can not find where to set this storage capacity.
>>
>> On Sat, 21 Mar 2015 20:30 Xi Shen  wrote:
>>
>>> Hi Sean,
>>>
>>> It's getting strange now. If I ran from IDE, my executor memory is
>>> always set to 6.7G, no matter what value I set in code. I have check my
>>> environment variable, and there's no value of 6.7, or 12.5
>>>
>>> Any idea?
>>>
>>> Thanks,
>>> David
>>>
>>> On Tue, 17 Mar 2015 00:35 null  wrote:
>>>
  Hi Xi Shen,

 You could set the spark.executor.memory in the code itself . new 
 SparkConf()..set("spark.executor.memory", "2g")

 Or you can try the -- spark.executor.memory 2g while submitting the jar.



 Regards

 Jishnu Prathap



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, March 16, 2015 2:06 PM
 *To:* Xi Shen
 *Cc:* user@spark.apache.org
 *Subject:* Re: How to set Spark executor memory?



 By default spark.executor.memory is set to 512m, I'm assuming since you
 are submiting the job using spark-submit and it is not able to override the
 value since you are running in local mode. Can you try it without using
 spark-submit as a standalone project?


   Thanks

 Best Regards



 On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen  wrote:

 I set it in code, not by configuration. I submit my jar file to local.
 I am working in my developer environment.



 On Mon, 16 Mar 2015 18:28 Akhil Das  wrote:

 How are you setting it? and how are you submitting the job?


   Thanks

 Best Regards



 On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen 
 wrote:

 Hi,



 I have set spark.executor.memory to 2048m, and in the UI "Environment"
 page, I can see this value has been set correctly. But in the "Executors"
 page, I saw there's only 1 executor and its memory is 265.4MB. Very strange
 value. why not 256MB, or just as what I set?



 What am I missing here?





 Thanks,

 David






  The information contained in this electronic message and any
 attachments to this message are intended for the exclusive use of the
 addressee(s) and may contain proprietary, confidential or privileged
 information. If you are not the intended recipient, you should not
 disseminate, distribute or copy this e-mail. Please notify the sender
 immediately and destroy all copies of this message and any attachments.
 WARNING: Computer viruses can be transmitted via email. The recipient
 should check this email and any attachments for the presence of viruses.
 The company accepts no liability for any damage caused by any virus
 transmitted by this email. www.wipro.com

>>>


Error while installing Spark 1.3.0 on local machine

2015-03-21 Thread HARIPRIYA AYYALASOMAYAJULA
Hello,

I am trying to install Spark 1.3.0 on my mac. Earlier, I was working with
Spark 1.1.0. Now, I come across this error :

sbt.ResolveException: unresolved dependency:
org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public
in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was
required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128)
at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
at sbt.IvySbt$$anon$4.call(Ivy.scala:64)
at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
at
xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
at xsbt.boot.Using$.withResource(Using.scala:10)
at xsbt.boot.Using$.apply(Using.scala:9)
at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
at xsbt.boot.Locks$.apply0(Locks.scala:31)
at xsbt.boot.Locks$.apply(Locks.scala:28)
at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
at sbt.IvySbt.withIvy(Ivy.scala:123)
at sbt.IvySbt.withIvy(Ivy.scala:120)
at sbt.IvySbt$Module.withModule(Ivy.scala:151)
at sbt.IvyActions$.updateEither(IvyActions.scala:157)
at
sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1318)
at
sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1315)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1345)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1343)
at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342)
at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:235)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
[error] (network-shuffle/*:update) sbt.ResolveException: unresolved
dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration
not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It
was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
[error] Total time: 5 s, completed Mar 21, 2015 7:48:45 PM

I tried uninstalling and re - installing, when I browsed over the internet,
I came across suggestions to include -Phadoop, now even if I use

 build/sbt -Pyarn -Phadoop-2.3 assembly

It gives me an error.

I greatly appreciate any help. Thank you for your time.


-- 
Regards,
Haripriya Ayyalasomayajula
Graduate Student
Department of Computer Science
University of Houston
Contact : 650-796-7112


Re: netlib-java cannot load native lib in Windows when using spark-submit

2015-03-21 Thread Ted Yu
Can you try the --driver-library-path option ?

spark-submit --driver-library-path /opt/hadoop/lib/native ...

Cheers

On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen  wrote:

> Hi,
>
> I use the *OpenBLAS* DLL, and have configured my application to work in
> IDE. When I start my Spark application from IntelliJ IDE, I can see in the
> log that the native lib is loaded successfully.
>
> But if I use *spark-submit* to start my application, the native lib still
> cannot be load. I saw the WARN message that it failed to load both the
> native and native-ref library. I checked the *Environment* tab in the
> Spark UI, and the *java.library.path* is set correctly.
>
>
> Thanks,
>
> David
>
>
>


Reducing Spark's logging verbosity

2015-03-21 Thread Edmon Begoli
Hi,
Does anyone have concrete recommendations how to reduce Spark's logging
verbosity.

We have attempted on several occasions to address this by setting various
log4j properties, both in configuration property files and in
$SPARK_HOME/conf/ spark-env.sh; however, all of those attempts have failed.

Any suggestions are welcome.

Thank you,
Edmon


netlib-java cannot load native lib in Windows when using spark-submit

2015-03-21 Thread Xi Shen
Hi,

I use the *OpenBLAS* DLL, and have configured my application to work in
IDE. When I start my Spark application from IntelliJ IDE, I can see in the
log that the native lib is loaded successfully.

But if I use *spark-submit* to start my application, the native lib still
cannot be load. I saw the WARN message that it failed to load both the
native and native-ref library. I checked the *Environment* tab in the Spark
UI, and the *java.library.path* is set correctly.


Thanks,

David


Re: How to set Spark executor memory?

2015-03-21 Thread Xi Shen
Yeah, I think it is harder to troubleshot the properties issues in a IDE.
But the reason I stick to IDE is because if I use spark-submit, the BLAS
native cannot be loaded. May be I should open another thread to discuss
that.

Thanks,
David

On Sun, 22 Mar 2015 10:38 Xi Shen  wrote:

> In the log, I saw
>
>   MemoryStorage: MemoryStore started with capacity 6.7GB
>
> But I still can not find where to set this storage capacity.
>
> On Sat, 21 Mar 2015 20:30 Xi Shen  wrote:
>
>> Hi Sean,
>>
>> It's getting strange now. If I ran from IDE, my executor memory is always
>> set to 6.7G, no matter what value I set in code. I have check my
>> environment variable, and there's no value of 6.7, or 12.5
>>
>> Any idea?
>>
>> Thanks,
>> David
>>
>> On Tue, 17 Mar 2015 00:35 null  wrote:
>>
>>>  Hi Xi Shen,
>>>
>>> You could set the spark.executor.memory in the code itself . new 
>>> SparkConf()..set("spark.executor.memory", "2g")
>>>
>>> Or you can try the -- spark.executor.memory 2g while submitting the jar.
>>>
>>>
>>>
>>> Regards
>>>
>>> Jishnu Prathap
>>>
>>>
>>>
>>> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
>>> *Sent:* Monday, March 16, 2015 2:06 PM
>>> *To:* Xi Shen
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: How to set Spark executor memory?
>>>
>>>
>>>
>>> By default spark.executor.memory is set to 512m, I'm assuming since you
>>> are submiting the job using spark-submit and it is not able to override the
>>> value since you are running in local mode. Can you try it without using
>>> spark-submit as a standalone project?
>>>
>>>
>>>   Thanks
>>>
>>> Best Regards
>>>
>>>
>>>
>>> On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen  wrote:
>>>
>>> I set it in code, not by configuration. I submit my jar file to local. I
>>> am working in my developer environment.
>>>
>>>
>>>
>>> On Mon, 16 Mar 2015 18:28 Akhil Das  wrote:
>>>
>>> How are you setting it? and how are you submitting the job?
>>>
>>>
>>>   Thanks
>>>
>>> Best Regards
>>>
>>>
>>>
>>> On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen  wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I have set spark.executor.memory to 2048m, and in the UI "Environment"
>>> page, I can see this value has been set correctly. But in the "Executors"
>>> page, I saw there's only 1 executor and its memory is 265.4MB. Very strange
>>> value. why not 256MB, or just as what I set?
>>>
>>>
>>>
>>> What am I missing here?
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>> David
>>>
>>>
>>>
>>>
>>>
>>>
>>>  The information contained in this electronic message and any
>>> attachments to this message are intended for the exclusive use of the
>>> addressee(s) and may contain proprietary, confidential or privileged
>>> information. If you are not the intended recipient, you should not
>>> disseminate, distribute or copy this e-mail. Please notify the sender
>>> immediately and destroy all copies of this message and any attachments.
>>> WARNING: Computer viruses can be transmitted via email. The recipient
>>> should check this email and any attachments for the presence of viruses.
>>> The company accepts no liability for any damage caused by any virus
>>> transmitted by this email. www.wipro.com
>>>
>>


Re: How to set Spark executor memory?

2015-03-21 Thread Xi Shen
In the log, I saw

  MemoryStorage: MemoryStore started with capacity 6.7GB

But I still can not find where to set this storage capacity.

On Sat, 21 Mar 2015 20:30 Xi Shen  wrote:

> Hi Sean,
>
> It's getting strange now. If I ran from IDE, my executor memory is always
> set to 6.7G, no matter what value I set in code. I have check my
> environment variable, and there's no value of 6.7, or 12.5
>
> Any idea?
>
> Thanks,
> David
>
> On Tue, 17 Mar 2015 00:35 null  wrote:
>
>>  Hi Xi Shen,
>>
>> You could set the spark.executor.memory in the code itself . new 
>> SparkConf()..set("spark.executor.memory", "2g")
>>
>> Or you can try the -- spark.executor.memory 2g while submitting the jar.
>>
>>
>>
>> Regards
>>
>> Jishnu Prathap
>>
>>
>>
>> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
>> *Sent:* Monday, March 16, 2015 2:06 PM
>> *To:* Xi Shen
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: How to set Spark executor memory?
>>
>>
>>
>> By default spark.executor.memory is set to 512m, I'm assuming since you
>> are submiting the job using spark-submit and it is not able to override the
>> value since you are running in local mode. Can you try it without using
>> spark-submit as a standalone project?
>>
>>
>>   Thanks
>>
>> Best Regards
>>
>>
>>
>> On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen  wrote:
>>
>> I set it in code, not by configuration. I submit my jar file to local. I
>> am working in my developer environment.
>>
>>
>>
>> On Mon, 16 Mar 2015 18:28 Akhil Das  wrote:
>>
>> How are you setting it? and how are you submitting the job?
>>
>>
>>   Thanks
>>
>> Best Regards
>>
>>
>>
>> On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen  wrote:
>>
>> Hi,
>>
>>
>>
>> I have set spark.executor.memory to 2048m, and in the UI "Environment"
>> page, I can see this value has been set correctly. But in the "Executors"
>> page, I saw there's only 1 executor and its memory is 265.4MB. Very strange
>> value. why not 256MB, or just as what I set?
>>
>>
>>
>> What am I missing here?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> David
>>
>>
>>
>>
>>
>>
>>  The information contained in this electronic message and any
>> attachments to this message are intended for the exclusive use of the
>> addressee(s) and may contain proprietary, confidential or privileged
>> information. If you are not the intended recipient, you should not
>> disseminate, distribute or copy this e-mail. Please notify the sender
>> immediately and destroy all copies of this message and any attachments.
>> WARNING: Computer viruses can be transmitted via email. The recipient
>> should check this email and any attachments for the presence of viruses.
>> The company accepts no liability for any damage caused by any virus
>> transmitted by this email. www.wipro.com
>>
>


join two DataFrames, same column name

2015-03-21 Thread Eric Friedman
I have a couple of data frames that I pulled from SparkSQL and the primary
key of one is a foreign key of the same name in the other.  I'd rather not
have to specify each column in the SELECT statement just so that I can
rename this single column.

When I try to join the data frames, I get an exception because it finds the
two columns of the same name to be ambiguous.  Is there a way to specify
which side of the join comes from data frame A and which comes from B?

var df1 = sqlContext.sql("select * from table1")
var df2 = sqlContext.sql("select * from table2)

df1.join(df2, df1("column_id") === df2("column_id"))


Re: Did DataFrames break basic SQLContext?

2015-03-21 Thread Michael Armbrust
>
> Now, I am not able to directly use my RDD object and have it implicitly
> become a DataFrame. It can be used as a DataFrameHolder, of which I could
> write:
>
> rdd.toDF.registerTempTable("foo")
>

The rational here was that we added a lot of methods to DataFrame and made
the implicits more powerful, but that increased the likelihood of
accidental application of the implicit.  I personally have had to explain
the accidental application of implicits (and the confusing compiler
messages that can result) to beginners so many times that we decided to
remove the subtle conversion from RDD to DataFrame and instead make it
explicit method call.


Re: saveAsTable broken in v1.3 DataFrames?

2015-03-21 Thread Michael Armbrust
I believe that you can get what you want by using HiveQL instead of the
pure programatic API.  This is a little verbose so perhaps a specialized
function would also be useful here.  I'm not sure I would call it
saveAsExternalTable as there are also "external" spark sql data source
tables that have nothing to do with hive.

The following should create a proper hive table:
df.registerTempTable("df")
sqlContext.sql("CREATE TABLE newTable AS SELECT * FROM df")

At the very least we should clarify in the documentation to avoid future
confusion.  The piggybacking is a little unfortunate but also gives us a
lot of new functionality that we can't get when strictly following the way
that Hive expects tables to be formatted.

I'd suggest opening a JIRA for the specialized method you describe.  Feel
free to mention me and Yin in a comment when create you it.

On Fri, Mar 20, 2015 at 12:55 PM, Christian Perez 
wrote:

> Any other users interested in a feature
> DataFrame.saveAsExternalTable() for making _useful_ external tables in
> Hive, or am I the only one? Bueller? If I start a PR for this, will it
> be taken seriously?
>
> On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez 
> wrote:
> > Hi Yin,
> >
> > Thanks for the clarification. My first reaction is that if this is the
> > intended behavior, it is a wasted opportunity. Why create a managed
> > table in Hive that cannot be read from inside Hive? I think I
> > understand now that you are essentially piggybacking on Hive's
> > metastore to persist table info between/across sessions, but I imagine
> > others might expect more (as I have.)
> >
> > We find ourselves wanting to do work in Spark and persist the results
> > where other users (e.g. analysts using Tableau connected to
> > Hive/Impala) can explore it. I imagine this is very common. I can, of
> > course, save it as parquet and create an external table in hive (which
> > I will do now), but saveAsTable seems much less useful to me now.
> >
> > Any other opinions?
> >
> > Cheers,
> >
> > C
> >
> > On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai  wrote:
> >> I meant table properties and serde properties are used to store
> metadata of
> >> a Spark SQL data source table. We do not set other fields like SerDe
> lib.
> >> For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source
> table
> >> should not show unrelated stuff like Serde lib and InputFormat. I have
> >> created https://issues.apache.org/jira/browse/SPARK-6413 to track the
> >> improvement on the output of DESCRIBE statement.
> >>
> >> On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai 
> wrote:
> >>>
> >>> Hi Christian,
> >>>
> >>> Your table is stored correctly in Parquet format.
> >>>
> >>> For saveAsTable, the table created is not a Hive table, but a Spark SQL
> >>> data source table
> >>> (
> http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources
> ).
> >>> We are only using Hive's metastore to store the metadata (to be
> specific,
> >>> only table properties and serde properties). When you look at table
> >>> property, there will be a field called "spark.sql.sources.provider"
> and the
> >>> value will be "org.apache.spark.sql.parquet.DefaultSource". You can
> also
> >>> look at your files in the file system. They are stored by Parquet.
> >>>
> >>> Thanks,
> >>>
> >>> Yin
> >>>
> >>> On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez 
> >>> wrote:
> 
>  Hi all,
> 
>  DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
>  CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
>  schema _and_ storage format in the Hive metastore, so that the table
>  cannot be read from inside Hive. Spark itself can read the table, but
>  Hive throws a Serialization error because it doesn't know it is
>  Parquet.
> 
>  val df = sc.parallelize( Array((1,2), (3,4)) ).toDF("education",
>  "income")
>  df.saveAsTable("spark_test_foo")
> 
>  Expected:
> 
>  COLUMNS(
>    education BIGINT,
>    income BIGINT
>  )
> 
>  SerDe Library:
>  org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
>  InputFormat:
>  org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
> 
>  Actual:
> 
>  COLUMNS(
>    col array COMMENT "from deserializer"
>  )
> 
>  SerDe Library:
> org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
>  InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
> 
>  ---
> 
>  Manually changing schema and storage restores access in Hive and
>  doesn't affect Spark. Note also that Hive's table property
>  "spark.sql.sources.schema" is correct. At first glance, it looks like
>  the schema data is serialized when sent to Hive but not deserialized
>  properly on receive.
> 
>  I'm tracing execution through source code... but before I get any
>  deeper, can anyone reproduce this behavior?
> 
>  Cheers,
> 
>  Ch

Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues

2015-03-21 Thread Eason Hu
Thank you for your help Akhil!  We found that it is no longer working from
our laptop to remotely connect to the remote Spark cluster, but it works if
the client is on the remote cluster as well, starting from the version
1.2.0 and beyond (v1.1.1 and below are fine).  Not sure if this is related
that Spark's internal communication got upgraded to a netty based
implementation which may not fit our firewall / network setup between
laptop and remote servers: https://issues.apache.org/jira/browse/SPARK-2468
in v1.2.0.  This is not very good for project development & debugging since
for every little change we need to recompile the entire jar and upload to
remote server then execute, instead of running it right the way on local
machine, but at least it works now.

Best,
Eason

On Thu, Mar 19, 2015 at 11:35 PM, Akhil Das 
wrote:

> Are you submitting your application from local to a remote host?
> If you want to run the spark application from a remote machine, then you have
> to at least set the following configurations properly.
>
>  - *spark.driver.host* - points to the ip/host from where you are
> submitting
>  the job (make sure you are able to ping this from the cluster)
>
>  - *spark.driver.port* - set it to a port number which is accessible from
>  the spark cluster.
>
>  You can look at more configuration options over here.
> 
>
>
> Thanks
> Best Regards
>
> On Fri, Mar 20, 2015 at 4:02 AM, Eason Hu  wrote:
>
>> Hi Akhil,
>>
>> Thank you for your help.  I just found that the problem is related to my
>> local spark application, since I ran it in IntelliJ and I didn't reload the
>> project after I recompile the jar via maven.  If I didn't reload, it will
>> use some local cache data to run the application which leads to two
>> different versions.  After I reloaded the project and reran, it was running
>> fine for v1.1.1 and I no longer saw that class incompatible issues.
>>
>> However, I now encounter a new issue starting from v1.2.0 and above.
>>
>> Using Spark's default log4j profile: 
>> org/apache/spark/log4j-defaults.properties
>> 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal 
>> handlers for [TERM, HUP, INT]
>> 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: 
>> hduser,eason.hu
>> 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: 
>> hduser,eason.hu
>> 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication 
>> disabled; ui acls disabled; users with view permissions: Set(hduser, 
>> eason.hu); users with modify permissions: Set(hduser, eason.hu)
>> 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started
>> 15/03/19 01:10:18 INFO Remoting: Starting remoting
>> 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses 
>> :[akka.tcp://driverPropsFetcher@hduser-07:59122]
>> 15/03/19 01:10:18 INFO Utils: Successfully started service 
>> 'driverPropsFetcher' on port 59122.
>> 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote 
>> system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is 
>> now gated for [5000] ms. Reason is: [Association failed with 
>> [akka.tcp://sparkDriver@192.168.1.53:65001]].
>> 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException 
>> as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: 
>> Futures timed out after [30 seconds]
>> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: 
>> Unknown exception in doAs
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421)
>>  at 
>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
>>  at 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128)
>>  at 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224)
>>  at 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
>> Caused by: java.security.PrivilegedActionException: 
>> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at javax.security.auth.Subject.doAs(Subject.java:415)
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>>  ... 4 more
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
>> [30 seconds]
>>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>  at 
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>  at 
>> scala.concurrent.BlockC

Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-21 Thread Steve Loughran

1. make sure your secret key doesn't have a "/" in it. If it does, generate a 
new key.
2. jets3t and hadoop JAR versions need to be in sync;  jets3t 0.9.0 was picked 
up in Hadoop 2.4 and not AFAIK
3. Hadoop 2.6 has a new S3 client, "s3a", which compatible with s3n data. It 
uses the AWS toolkit over JetS3t, where all future dev is going. Assuming it is 
up date with the AWS toolkit, it will do the auth. Not knowingly tested against 
frankfurt though; just ireland, US east, US west & Japan.  S3a still has some 
quirks being worked through; HADOOP-11571 lists the set fixed.

On 20 Mar 2015, at 15:15, Ralf Heyde 
mailto:r...@hubrick.com>> wrote:

Good Idea, will try that.
But assuming, "only" data is located there, the problem will still occur.

On Fri, Mar 20, 2015 at 3:08 PM, Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:
Hi Ralf,

using secret keys and authorization details is a strict NO for AWS, they are 
major security lapses and should be avoided at any cost.

Have you tried starting the clusters using ROLES, they are wonderful way to 
start clusters or EC2 nodes and you do not have to copy and paste any 
permissions either.

Try going through this article in AWS: 
http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-iam-roles.html 
(though for datapipeline, they show the correct set of permissions to enable).

I start EC2 nodes using roles (as mentioned in the link above), run the aws cli 
commands (without copying any keys or files).

Please let me know if the issue was resolved.

Regards,
Gourav

On Fri, Mar 20, 2015 at 1:53 PM, Ralf Heyde 
mailto:r...@hubrick.com>> wrote:
Hey,

We want to run a Job, accessing S3, from EC2 instances. The Job runs in a 
self-provided Spark Cluster (1.3.0) on EC2 instances. In Irland everything 
works as expected.

i just tried to move data from Irland -> Frankfurt. AWS S3 is forcing v4 of 
their API there, means: access is only possible via: AWS4-HMAC-SHA256

This is still ok, but I dont get access there. What I tried already:

All of the Approaches I tried with these URLs:
A) "s3n://:@//"
B) "s3://:@//"
C) "s3n:"
D) "s3:"

1a. setting Environment Variables in the operating system
1b. found something, to set AccessKey/Secret in SparkConf like that (I guess, 
this does not have any effect)
   sc.set("​AWS_ACCESS_KEY_ID", id)
   sc.set("​AWS_SECRET_ACCESS_KEY", secret)

2. tried to use a "more up to date" jets3t client (somehow I was not able to 
get the "new" version running)
3. tried in-URL basic authentication (A+B)
4. Setting the hadoop configuration:
hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3.S3FileSystem");
hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key);
hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret);

hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem");
hadoopConfiguration.set("fs.s3.awsAccessKeyId", "myAccessKey");
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "myAccessSecret");

-->
Caused by: org.jets3t.service.S3ServiceException: S3 GET failed for 
'/%2FEAN%2F2015-03-09-72640385%2Finput%2FHotelImageList.gz' XML Error Message: 
InvalidRequestThe authorization 
mechanism you have provided is not supported. Please use 
AWS4-HMAC-SHA256.43F8F02E767DC4A2wgMeAEYcZZa/2BazQ9TA+PAkUxt5l+ExnT4Emb+1Uk5KhWfJu5C8Xcesm1AXCfJ9nZJMyh4wPX8=

2. setting Hadoop Configuration
hadoopConfiguration.set("fs.s3n.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConfiguration.set("fs.s3n.awsAccessKeyId", key);
hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secret);

hadoopConfiguration.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConfiguration.set("fs.s3.awsAccessKeyId", "myAccessKey");
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "myAccessSecret");

-->
Caused by: org.jets3t.service.S3ServiceException: S3 HEAD request failed for 
'/EAN%2F2015-03-09-72640385%2Finput%2FHotelImageList.gz' - ResponseCode=400, 
ResponseMessage=Bad Request

5. without Hadoop Config
Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key 
ID and Secret Access Key must be specified as the username or password 
(respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
fs.s3.awsSecretAccessKey properties (respectively).

6. without Hadoop Config but passed in S3 URL
with A) Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception: 
org.jets3t.service.S3ServiceException: S3 HEAD request failed for 
'/EAN%2F2015-03-09-72640385%2Finput%2FHotelImageList.gz' - ResponseCode=400, 
ResponseMessage=Bad Request
with B) Exception in thread "main" java.lang.IllegalArgumentException: AWS 
Access Key ID and Secret Access Key must be specified as the username or 
password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
fs.s3.awsSecretAccessKey properties (respectively).


Drilled down in the Job, I can see, that the RestStorageService recognizes 
AWS4-HMAC-SHA256 ... but somehow it gets a Res

Spark streaming alerting

2015-03-21 Thread Mohit Anchlia
Is there a module in spark streaming that lets you listen to
the alerts/conditions as they happen in the streaming module? Generally
spark streaming components will execute on large set of clusters like hdfs
or Cassandra, however when it comes to alerting you generally can't send it
directly from the spark workers, which means you need a way to listen to
the alerts.


ArrayIndexOutOfBoundsException in ALS.trainImplicit

2015-03-21 Thread Sabarish Sasidharan
I am consistently running into this ArrayIndexOutOfBoundsException issue
when using trainImplicit. I have tried changing the partitions and
switching to JavaSerializer. But they don't seem to help. I see that this
is the same as https://issues.apache.org/jira/browse/SPARK-3080. My lambda
is 0.01, rank is 5,  iterations is 10 and alpha is 0.01. I am using 41
executors, each with 8GB on a 48 million dataset.

15/03/21 13:07:29 ERROR executor.Executor: Exception in task 12.0 in stage
2808.0 (TID 40575)
java.lang.ArrayIndexOutOfBoundsException: 692
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:548)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.mllib.recommendation.ALS.org
$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:542)
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510)
at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:509)
at
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
at
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)

How can I get around this issue?

​Regards
Sab

-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Model deployment help

2015-03-21 Thread Shashidhar Rao
Hi,

Apologies for the generic question.

As I am developing predictive models for the first time and soon model will
be deployed in production very soon.

Could somebody help me with the  model deployment in production , I have
read quite a few on model deployment and have read some books on Database
deployment .

My queries relate to how  updates to model happen when current model
degenerates without any downtime and how others are deploying in production
servers and a few lines on adoption of PMML currently in production.

Please provide me with some good links  or some forums  so that I can learn
as most of the books do not cover it extensively except for 'Mahout in
action' where it is explained in some detail and have also checked
stackoverflow but have not got any relevant answers.

What I understand:
1. Build model using current training set and test the model.
2. Deploy the model,put it in some location and load it and predict when
request comes for scoring.
3. Model degenerates , now build new model with new data.(Here some
confusion , whether the old data is discarded completely or it is done with
purely new data or a mix)
4. Here I am stuck , how to update the model without any downtime, the
transition period when old model and new model happens.

My naive solution would be, build the new model , save it in a new location
and update the new path in some properties file or update the location in
database when the saving is done. Is this correct or some best practices
are available.
Database is unlikely in my case.

Thanks in advance.


'nested' RDD problem, advise needed

2015-03-21 Thread Michael Lewis
Hi,

I wonder if someone can help suggest a solution to my problem, I had a simple 
process working using Strings and now
want to convert to RDD[Char], the problem is when I end up with a nested call 
as follow:


1) Load a text file into an RDD[Char]

val inputRDD = sc.textFile(“myFile.txt”).flatMap(_.toIterator)


2) I have a method that takes two parameters:

object Foo
{
def myFunction(inputRDD: RDD[Char], int val) : RDD[Char] ...


3) I have a method that the driver process calls once its loaded the inputRDD 
‘bar’ as follows:

def bar(inputRDD: Rdd[Char) : Int = {

 val solutionSet = sc.parallelize(1 to alphabetLength toList).map(shift 
=> (shift, Object.myFunction(inputRDD,shift)))



What I’m trying to do is take a list 1..26 and generate a set of tuples { 
(1,RDD(1)), …. (26,RDD(26)) }  which is the inputRDD passed through
the function above, but with different set of shift parameters.

In my original I could parallelise the algorithm fine, but my input string had 
to be in a ‘String’ variable, I’d rather it be an RDD 
(string could be large). I think the way I’m trying to do it above won’t work 
because its a nested RDD call. 

Can anybody suggest a solution?

Regards,
Mike Lewis





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming S3 Performance Implications

2015-03-21 Thread Ted Yu
Mike:
Once hadoop 2.7.0 is released, you should be able to enjoy the enhanced
performance of s3a.
See HADOOP-11571

Cheers

On Sat, Mar 21, 2015 at 8:09 AM, Chris Fregly  wrote:

> hey mike!
>
> you'll definitely want to increase your parallelism by adding more shards
> to the stream - as well as spinning up 1 receiver per shard and unioning
> all the shards per the KinesisWordCount example that is included with the
> kinesis streaming package.
>
> you'll need more cores (cluster) or threads (local) to support this -
> equalling at least the number of shards/receivers + 1.
>
> also, it looks like you're writing to S3 per RDD.  you'll want to broaden
> that out to write DStream batches - or expand  even further and write
> window batches (where the window interval is a multiple of the batch
> interval).
>
> this goes for any spark streaming implementation - not just Kinesis.
>
> lemme know if that works for you.
>
> thanks!
>
> -Chris
> _
> From: Mike Trienis 
> Sent: Wednesday, March 18, 2015 2:45 PM
> Subject: Spark Streaming S3 Performance Implications
> To: 
>
>
>
>  Hi All,
>
>  I am pushing data from Kinesis stream to S3 using Spark Streaming and
> noticed that during testing (i.e. master=local[2]) the batches (1 second
> intervals) were falling behind the incoming data stream at about 5-10
> events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking
> at a few seconds to complete.
>
>   val saveFunc = (rdd: RDD[String], time: Time) => {
>
>  val count = rdd.count()
>
>  if (count > 0) {
>
>  val s3BucketInterval = time.milliseconds.toString
>
> rdd.saveAsTextFile(s3n://...)
>
>  }
>  }
>
>  dataStream.foreachRDD(saveFunc)
>
>
>  Should I expect the same behaviour in a deployed cluster? Or does the
> rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node?
>
>  "Write the elements of the dataset as a text file (or set of text files)
> in a given directory in the local filesystem, HDFS or any other
> Hadoop-supported file system. Spark will call toString on each element to
> convert it to a line of text in the file."
>
>  Thanks, Mike.
>
>
>


Re: How to set Spark executor memory?

2015-03-21 Thread Sean Owen
If you are running from your IDE, then I don't know what you are
running or in what mode. The discussion here concerns using standard
mechanisms like spark-submit to configure executor memory. Please try
these first instead of trying to directly invoke Spark, which will
require more understanding of how the props are set.

On Sat, Mar 21, 2015 at 5:30 AM, Xi Shen  wrote:
> Hi Sean,
>
> It's getting strange now. If I ran from IDE, my executor memory is always
> set to 6.7G, no matter what value I set in code. I have check my environment
> variable, and there's no value of 6.7, or 12.5
>
> Any idea?
>
> Thanks,
> David
>
>
> On Tue, 17 Mar 2015 00:35 null  wrote:
>>
>> Hi Xi Shen,
>>
>> You could set the spark.executor.memory in the code itself . new
>> SparkConf()..set("spark.executor.memory", "2g")
>>
>> Or you can try the -- spark.executor.memory 2g while submitting the jar.
>>
>>
>>
>> Regards
>>
>> Jishnu Prathap
>>
>>
>>
>> From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
>> Sent: Monday, March 16, 2015 2:06 PM
>> To: Xi Shen
>> Cc: user@spark.apache.org
>> Subject: Re: How to set Spark executor memory?
>>
>>
>>
>> By default spark.executor.memory is set to 512m, I'm assuming since you
>> are submiting the job using spark-submit and it is not able to override the
>> value since you are running in local mode. Can you try it without using
>> spark-submit as a standalone project?
>>
>>
>> Thanks
>>
>> Best Regards
>>
>>
>>
>> On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen  wrote:
>>
>> I set it in code, not by configuration. I submit my jar file to local. I
>> am working in my developer environment.
>>
>>
>>
>> On Mon, 16 Mar 2015 18:28 Akhil Das  wrote:
>>
>> How are you setting it? and how are you submitting the job?
>>
>>
>> Thanks
>>
>> Best Regards
>>
>>
>>
>> On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen  wrote:
>>
>> Hi,
>>
>>
>>
>> I have set spark.executor.memory to 2048m, and in the UI "Environment"
>> page, I can see this value has been set correctly. But in the "Executors"
>> page, I saw there's only 1 executor and its memory is 265.4MB. Very strange
>> value. why not 256MB, or just as what I set?
>>
>>
>>
>> What am I missing here?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> David
>>
>>
>>
>>
>>
>>
>>
>> The information contained in this electronic message and any attachments
>> to this message are intended for the exclusive use of the addressee(s) and
>> may contain proprietary, confidential or privileged information. If you are
>> not the intended recipient, you should not disseminate, distribute or copy
>> this e-mail. Please notify the sender immediately and destroy all copies of
>> this message and any attachments. WARNING: Computer viruses can be
>> transmitted via email. The recipient should check this email and any
>> attachments for the presence of viruses. The company accepts no liability
>> for any damage caused by any virus transmitted by this email. www.wipro.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming S3 Performance Implications

2015-03-21 Thread Chris Fregly
hey mike!
you'll definitely want to increase your parallelism by adding more shards to 
the stream - as well as spinning up 1 receiver per shard and unioning all the 
shards per the KinesisWordCount example that is included with the kinesis 
streaming package. 
you'll need more cores (cluster) or threads (local) to support this - equalling 
at least the number of shards/receivers + 1.
also, it looks like you're writing to S3 per RDD.  you'll want to broaden that 
out to write DStream batches - or expand  even further and write window batches 
(where the window interval is a multiple of the batch interval).
this goes for any spark streaming implementation - not just Kinesis.
lemme know if that works for you.
thanks!
-Chris 
_
From: Mike Trienis 
Sent: Wednesday, March 18, 2015 2:45 PM
Subject: Spark Streaming S3 Performance Implications
To:  


   Hi All,   
  I am pushing data from Kinesis stream to S3 using Spark Streaming and 
noticed that during testing (i.e. master=local[2]) the batches (1 second 
intervals) were falling behind the incoming data stream at about 5-10 events / 
second. It seems that the rdd.saveAsTextFile(s3n://...) is taking at a few 
seconds to complete.   
           val saveFunc = (rdd: RDD[String], time: Time) => {   
  
             val count = rdd.count() 
             if (count > 0) { 
                 val s3BucketInterval = time.milliseconds.toString  
   
                rdd.saveAsTextFile(s3n://...)   
            }         } 
         dataStream.foreachRDD(saveFunc)  
  
  Should I expect the same behaviour in a deployed cluster? Or does the 
rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node? 
 
  "Write the elements of the dataset as a text file (or set of text 
files) in a given directory in the local filesystem, HDFS or any other 
Hadoop-supported file system. Spark will call toString on each element to 
convert it to a line of text in the file."  
  Thanks, Mike. 

Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-21 Thread Ted Yu
bq. Requesting 1 new executor(s) because tasks are backlogged

1 executor was requested.

Which hadoop release are you using ?

Can you check resource manager log to see if there is some clue ?

Thanks

On Fri, Mar 20, 2015 at 4:17 PM, Manoj Samel 
wrote:

> Forgot to add - the cluster is idle otherwise so there should be no
> resource issues. Also the configuration works when not using Dynamic
> allocation.
>
> On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel 
> wrote:
>
>> Hi,
>>
>> Running Spark 1.3 with secured Hadoop.
>>
>> Spark-shell with Yarn client mode runs without issue when not using
>> Dynamic Allocation.
>>
>> When Dynamic allocation is turned on, the shell comes up but same SQL
>> etc. causes it to loop.
>>
>> spark.dynamicAllocation.enabled=true
>> spark.dynamicAllocation.initialExecutors=1
>> spark.dynamicAllocation.maxExecutors=10
>> # Set IdleTime low for testing
>> spark.dynamicAllocation.executorIdleTimeout=60
>> spark.shuffle.service.enabled=true
>>
>> Following is the start of the messages and then it keeps looping with
>> "Requesting 0 new executors"
>>
>> 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
>> broadcast_1_piece0
>> 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
>> broadcast at DAGScheduler.scala:839
>> 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
>> from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
>> 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
>> tasks
>> 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources
>> 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources
>> 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>> 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
>> executor(s) because tasks are backlogged (new desired total will be 1)
>>
>
>


Re: Spark per app logging

2015-03-21 Thread Jeffrey Jedele
Hi,
I'm not completely sure about this either, but this is what we are doing
currently:
Configure your logging to write to STDOUT, not to a file explicitely. Spark
will capture stdour and stderr and separate the messages into a app/driver
folder structure in the configured worker directory.

We then use logstash to collect the logs and index them to a elasticsearch
cluster (Spark seems to produce a lot of logging data). With some simple
regex processing, you also get the application id as searchable field.

Regards,
Jeff

2015-03-20 22:37 GMT+01:00 Ted Yu :

> Are these jobs the same jobs, just run by different users or, different
> jobs ?
> If the latter, can each application use its own log4j.properties ?
>
> Cheers
>
> On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta  wrote:
>
>> Hi,
>>
>> We have spark setup such that there are various users running multiple
>> jobs at the same time. Currently all the logs go to 1 file specified in the
>> log4j.properties.
>> Is it possible to configure log4j in spark for per app/user logging
>> instead of sending all logs to 1 file mentioned in the log4j.properties?
>>
>> Thanks
>> Udit
>>
>
>


Re: Spark Streaming Not Reading Messages From Multiple Kafka Topics

2015-03-21 Thread Jeffrey Jedele
Hey Eason!
Weird problem indeed. More information will probably help to find te issue:

Have you searched the logs for peculiar messages?
How does your Spark environment look like? #workers, #threads, etc?
Does it work if you create separate receivers for the topics?

Regards,
Jeff

2015-03-21 2:27 GMT+01:00 EH :

> Hi all,
>
> I'm building a Spark Streaming application that will continuously read
> multiple kafka topics at the same time.  However, I found a weird issue
> that
> it reads only hundreds of messages then it stopped reading any more.  If I
> changed the three topic to only one topic, then it is fine and it will
> continue to consume.  Below is the code I have.
>
> val consumerThreadsPerInputDstream = 1
> val topics = Map("raw_0" -> consumerThreadsPerInputDstream)
>  "raw_1" -> consumerThreadsPerInputDstream,
>  "raw_2" -> consumerThreadsPerInputDstream)
>
> val msgs = KafkaUtils.createStream(ssc, "10.10.10.10:2181/hkafka",
> "group01", topics).map(_._2)
> ...
>
> How come it will no longer consume after hundreds of messages for three
> topic reading?  How to resolve this issue?
>
> Thank you for your help,
> Eason
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to set Spark executor memory?

2015-03-21 Thread Xi Shen
Hi Sean,

It's getting strange now. If I ran from IDE, my executor memory is always
set to 6.7G, no matter what value I set in code. I have check my
environment variable, and there's no value of 6.7, or 12.5

Any idea?

Thanks,
David

On Tue, 17 Mar 2015 00:35 null  wrote:

>  Hi Xi Shen,
>
> You could set the spark.executor.memory in the code itself . new 
> SparkConf()..set("spark.executor.memory", "2g")
>
> Or you can try the -- spark.executor.memory 2g while submitting the jar.
>
>
>
> Regards
>
> Jishnu Prathap
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Monday, March 16, 2015 2:06 PM
> *To:* Xi Shen
> *Cc:* user@spark.apache.org
> *Subject:* Re: How to set Spark executor memory?
>
>
>
> By default spark.executor.memory is set to 512m, I'm assuming since you
> are submiting the job using spark-submit and it is not able to override the
> value since you are running in local mode. Can you try it without using
> spark-submit as a standalone project?
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen  wrote:
>
> I set it in code, not by configuration. I submit my jar file to local. I
> am working in my developer environment.
>
>
>
> On Mon, 16 Mar 2015 18:28 Akhil Das  wrote:
>
> How are you setting it? and how are you submitting the job?
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen  wrote:
>
> Hi,
>
>
>
> I have set spark.executor.memory to 2048m, and in the UI "Environment"
> page, I can see this value has been set correctly. But in the "Executors"
> page, I saw there's only 1 executor and its memory is 265.4MB. Very strange
> value. why not 256MB, or just as what I set?
>
>
>
> What am I missing here?
>
>
>
>
>
> Thanks,
>
> David
>
>
>
>
>
>
>  The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments. WARNING: Computer viruses can be
> transmitted via email. The recipient should check this email and any
> attachments for the presence of viruses. The company accepts no liability
> for any damage caused by any virus transmitted by this email.
> www.wipro.com
>


Re: Can I start multiple executors in local mode?

2015-03-21 Thread Xi Shen
No, I didn't mean local cluster. I mean run in local, like in IDE.

On Mon, 16 Mar 2015 23:12 xu Peng  wrote:

> Hi David,
>
> You can try the local-cluster.
>
> the number in local-cluster[2,2,1024] represents that there are 2 worker,
> 2 cores and 1024M
>
> Best Regards
>
> Peng Xu
>
> 2015-03-16 19:46 GMT+08:00 Xi Shen :
>
>> Hi,
>>
>> In YARN mode you can specify the number of executors. I wonder if we can
>> also start multiple executors at local, just to make the test run faster.
>>
>> Thanks,
>> David
>>
>
>


Re: About the env of Spark1.2

2015-03-21 Thread sandeep vura
Make sure if you are using 127.0.0.1 please check in /etc/hosts and uncheck
or create 127.0.1.1 named it as localhost

On Sat, Mar 21, 2015 at 9:57 AM, Ted Yu  wrote:

> bq. Caused by: java.net.UnknownHostException: dhcp-10-35-14-100: Name or
> service not known
>
> Can you check your DNS ?
>
> Cheers
>
> On Fri, Mar 20, 2015 at 8:54 PM, tangzilu  wrote:
>
>> Hi All:
>> I recently started to deploy Spark1.2 in my VisualBox Linux.
>> But when I run the command "./spark-shell" in the path of
>> "/opt/spark-1.2.1/bin", I got the result like this:
>>
>> [root@dhcp-10-35-14-100 bin]# ./spark-shell
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root
>> 15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root
>> 15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> 15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server
>> 15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class
>> server' on port 47691.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
>>   /_/
>>
>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name
>> or service not known
>> at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
>> at
>> org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710)
>> at
>> org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702)
>> at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702)
>> at org.apache.spark.HttpServer.uri(HttpServer.scala:158)
>> at
>> org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982)
>> at $iwC$$iwC.(:9)
>> at $iwC.(:18)
>> at (:20)
>> at .(:24)
>> at .()
>> at .(:7)
>> at .()
>> at $print()
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
>> at
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
>> at
>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
>> at
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
>> at
>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
>> at
>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
>> at
>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
>> at
>> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
>> at
>> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
>> at
>> org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
>> at
>> org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
>> at
>> org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
>> at
>> org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
>> at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
>> at
>> org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
>> at
>> org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>> at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
>> at org.apache.spark.repl.Main$.main(Main.scala:31)
>> at org.apache.spark.repl.Main.main(Main.scala)
>> at sun.reflect.NativeMetho