How to use Eclipse on Windows to build Spark environment?

2015-05-27 Thread Nan Xiao
Hi all,

I want to use Eclipse on Windows to build Spark environment, but find
the reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on
Windows to build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Model weights of linear regression becomes abnormal values

2015-05-27 Thread 吴明瑜
Sorry. I mean the parameter step.

2015-05-28 12:21 GMT+08:00 Maheshakya Wijewardena :

> What is the parameter for the learning rate alpha? LinearRegressionWithSGD
> has only following parameters.
>
>
> @param data:  The training data.
>> @param iterations:The number of iterations (default: 100).
>> @param step:  The step parameter used in SGD
>>   (default: 1.0).
>> @param miniBatchFraction: Fraction of data to be used for each SGD
>>   iteration.
>> @param initialWeights:The initial weights (default: None).
>> @param regParam:  The regularizer parameter (default: 1.0).
>> @param regType:   The type of regularizer used for training
>>   our model.
>>   Allowed values: "l1" for using L1Updater,
>>   "l2" for using
>>SquaredL2Updater,
>>   "none" for no regularizer.
>>   (default: "none")
>> @param intercept: Boolean parameter which indicates the use
>>   or not of the augmented representation for
>>   training data (i.e. whether bias features
>>   are activated or not).
>>
>>
> On Thu, May 28, 2015 at 9:42 AM, 吴明瑜  wrote:
>
>> The problem may occur when your algorithm cannot converge. Maybe you can
>> check if the learning rate alpha is too large. Try reducing it.
>>
>> 2015-05-28 12:08 GMT+08:00 Maheshakya Wijewardena :
>>
>>>
>>> Hi,
>>>
>>> I'm trying to use Sparks' *LinearRegressionWithSGD* in PySpark with the
>>> attached dataset. The code is attached. When I check the model weights
>>> vector after training, it contains `nan` values.
>>>
>>> [nan,nan,nan,nan,nan,nan,nan,nan]
>>>
>>> But for some data sets, this problem does not occur. What might be the 
>>> reason for this?
>>> Is this an issue with the data I'm using or a bug?
>>>
>>> Best regards.
>>>
>>> --
>>> Pruthuvi Maheshakya Wijewardena
>>> Software Engineer
>>> WSO2 Lanka (Pvt) Ltd
>>> Email: mahesha...@wso2.com
>>> Mobile: +94711228855
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>>
>> --
>> Mingyu Wu
>>
>> Institute of Parallel and Distributed Systems
>>
>> School of Software Engineering
>>
>> Shanghai Jiao Tong University
>>
>>
>
>
> --
> Pruthuvi Maheshakya Wijewardena
> Software Engineer
> WSO2 Lanka (Pvt) Ltd
> Email: mahesha...@wso2.com
> Mobile: +94711228855
>
>
>


-- 
Mingyu Wu

Institute of Parallel and Distributed Systems

School of Software Engineering

Shanghai Jiao Tong University


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Ji ZHANG
Hi,

Yes, I'm using createStream, but the storageLevel param is by default
MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
don't think Kafka messages will be cached in driver.


On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
wrote:

> Are you using the createStream or createDirectStream api? If its the
> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
> slow things down though). Another way would be to try the later one.
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:
>
>> Hi Akhil,
>>
>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>> maybe it is, but how to tell?
>>
>> I googled about how to check the off-heap memory usage, there's a tool
>> called pmap, but I don't know how to interprete the results.
>>
>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
>> wrote:
>>
>>> After submitting the job, if you do a ps aux | grep spark-submit then
>>> you can see all JVM params. Are you using the highlevel consumer (receiver
>>> based) for receiving data from Kafka? In that case if your throughput is
>>> high and the processing delay exceeds batch interval then you will hit this
>>> memory issues as the data will keep on receiving and is dumped to memory.
>>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
>>> Another alternate will be to use the lowlevel kafka consumer
>>>  or to use the
>>> non-receiver based directStream
>>> 
>>> that comes up with spark.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG  wrote:
>>>
 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
 out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch interval
 of 1, do some filtering and aggregation, and then print to executor logs.
 So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry

>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry


Re: Model weights of linear regression becomes abnormal values

2015-05-27 Thread Maheshakya Wijewardena
Thanks for the information. I'll try that out with Spark 1.4.

On Thu, May 28, 2015 at 9:54 AM, DB Tsai  wrote:

> LinearRegressionWithSGD requires to tune the step size and # of
> iteration very carefully. Please try Linear Regression with elastic
> net implementation in Spark 1.4 in ML framework, which uses quasi
> newton method and step size will be automatically determined. That
> implementation also matches the result from R.
>
> Sincerely,
>
> DB Tsai
> ---
> Blog: https://www.dbtsai.com
>
>
> On Wed, May 27, 2015 at 9:08 PM, Maheshakya Wijewardena
>  wrote:
> >
> > Hi,
> >
> > I'm trying to use Sparks' LinearRegressionWithSGD in PySpark with the
> > attached dataset. The code is attached. When I check the model weights
> > vector after training, it contains `nan` values.
> >
> > [nan,nan,nan,nan,nan,nan,nan,nan]
> >
> > But for some data sets, this problem does not occur. What might be the
> > reason for this?
> > Is this an issue with the data I'm using or a bug?
> >
> > Best regards.
> >
> > --
> > Pruthuvi Maheshakya Wijewardena
> > Software Engineer
> > WSO2 Lanka (Pvt) Ltd
> > Email: mahesha...@wso2.com
> > Mobile: +94711228855
> >
> >
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Pruthuvi Maheshakya Wijewardena
Software Engineer
WSO2 Lanka (Pvt) Ltd
Email: mahesha...@wso2.com
Mobile: +94711228855


Re: Model weights of linear regression becomes abnormal values

2015-05-27 Thread DB Tsai
LinearRegressionWithSGD requires to tune the step size and # of
iteration very carefully. Please try Linear Regression with elastic
net implementation in Spark 1.4 in ML framework, which uses quasi
newton method and step size will be automatically determined. That
implementation also matches the result from R.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Wed, May 27, 2015 at 9:08 PM, Maheshakya Wijewardena
 wrote:
>
> Hi,
>
> I'm trying to use Sparks' LinearRegressionWithSGD in PySpark with the
> attached dataset. The code is attached. When I check the model weights
> vector after training, it contains `nan` values.
>
> [nan,nan,nan,nan,nan,nan,nan,nan]
>
> But for some data sets, this problem does not occur. What might be the
> reason for this?
> Is this an issue with the data I'm using or a bug?
>
> Best regards.
>
> --
> Pruthuvi Maheshakya Wijewardena
> Software Engineer
> WSO2 Lanka (Pvt) Ltd
> Email: mahesha...@wso2.com
> Mobile: +94711228855
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Model weights of linear regression becomes abnormal values

2015-05-27 Thread Maheshakya Wijewardena
Hi,

I'm trying to use Sparks' *LinearRegressionWithSGD* in PySpark with the
attached dataset. The code is attached. When I check the model weights
vector after training, it contains `nan` values.

[nan,nan,nan,nan,nan,nan,nan,nan]

But for some data sets, this problem does not occur. What might be the
reason for this?
Is this an issue with the data I'm using or a bug?

Best regards.

-- 
Pruthuvi Maheshakya Wijewardena
Software Engineer
WSO2 Lanka (Pvt) Ltd
Email: mahesha...@wso2.com
Mobile: +94711228855
6,148,72,35,0,336,627,50,1
1,85,66,29,0,266,351,31,0
8,183,64,0,0,233,672,32,1
1,89,66,23,94,281,167,21,0
0,137,40,35,168,431,2288,33,1
5,116,74,0,0,256,201,30,0
3,78,50,32,88,310,248,26,1
10,115,0,0,0,353,134,29,0
2,197,70,45,543,305,158,53,1
8,125,96,0,0,0,232,54,1
4,110,92,0,0,376,191,30,0
10,168,74,0,0,380,537,34,1
10,139,80,0,0,271,1441,57,0
1,189,60,23,846,301,398,59,1
5,166,72,19,175,258,587,51,1
7,100,0,0,0,300,484,32,1
0,118,84,47,230,458,551,31,1
7,107,74,0,0,296,254,31,1
1,103,30,38,83,433,183,33,0
1,115,70,30,96,346,529,32,1
3,126,88,41,235,393,704,27,0
import sys
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array

# Load and parse data
def parse_point(line):
values = [float(x) for x in line.split(',')]
return LabeledPoint(values[0], values[1:])

sc = SparkContext(appName='LinearRegression')
# Add path to your dataset.
data = sc.textFile('dummy_data_sest.csv')
parsedData = data.map(parse_point)

# Build the model
model = LinearRegressionWithSGD.train(parsedData)

# Check model weight vector
print(model.weights)
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Ted Yu
bq. detect the presence of a new node and start utilizing it

My understanding is that Spark is concerned with managing executors.
Whether request for an executor is fulfilled on an existing node or a new
node is up to the underlying cluster manager (YARN e.g.).
Assuming the cluster is single tenant, executor requests are more likely to
be fulfilled on the new nodes.

Please correct me if I am wrong.

On Wed, May 27, 2015 at 8:26 PM, Dmitry Goldenberg  wrote:

> Thanks, Rajesh.  I think that acquring/relinquishing executors is
> important but I feel like there are at least two layers for resource
> allocation and autoscaling.  It seems that acquiring and relinquishing
> executors is a way to optimize resource utilization within a pre-set Spark
> cluster of machines.
>
> However, to accommodate for big spikes in input data, we also need the
> actual cluster scaling, i.e. adding (or removing, when no longer needed)
> worker node machines automatically.  On that front, I wonder how Spark
> reacts to a machine being added or removed and what the actual procedure
> would be.  If we're running on a Hadoop cluster, there's a description of
> adding a node
> 
> there.  There's also discussions of Hadoop node adding/removal such as this
> one
> 
> .
>
> My worry is, will Spark "gracefully" and "quickly" detect the presence of
> a new node and start utilizing it (i.e. how much does it communicate with
> the Hadoop cluster manager?)...  By the same token, if a node is removed,
> how can it be removed gracefully so as not to affect/kill any running Spark
> jobs?
>
> On Wed, May 27, 2015 at 10:57 PM,  wrote:
>
>> *Dell - Internal Use - Confidential *
>>
>> Did you check
>> https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit and
>> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Not sure if the spark kafka receiver emits metrics on the lag, check this
>>  link out
>> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
>>
>>
>>
>> You should be able to whip up a script that runs the Kafka
>> ConsumerOffsetChecker periodically and pipe it to a metrics backend of your
>> choice. Based on this you can work the dynamic resource allocation magic.
>>
>> -Original Message-
>> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
>> Sent: Wednesday, May 27, 2015 6:21 PM
>> To: user@spark.apache.org
>> Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in
>> Kafka or Spark's metrics?
>>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing. What are some of the ways to generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has mentioned Mesos...
>>
>> I see some info on Spark metrics in the Spark monitoring guide . Do we
>> want to perhaps implement a custom sink that would help us autoscale up or
>> down based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread ayan guha
Yes, you are at right path. Only thing to remember is placing hive site XML
to correct path so spark can talk to hive metastore.

Best
Ayan
On 28 May 2015 10:53, "Sanjay Subramanian"
 wrote:

> hey guys
>
> On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x
> , there are about 300+ hive tables.
> The data is stored an text (moving slowly to Parquet) on HDFS.
> I want to use SparkSQL and point to the Hive metadata and be able to
> define JOINS etc using a programming structure like this
>
> import org.apache.spark.sql.hive.HiveContext
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")
>
>
> Is that the way to go ? Some guidance will be great.
>
> thanks
>
> sanjay
>
>
>
>


Re: Spark on Mesos vs Yarn

2015-05-27 Thread Bharath Ravi Kumar
A follow up : considering that spark on mesos is indeed important to
databricks, its partners and the community, fundamental issues like
spark-6284 shouldn't be languishing for this long. A mesos cluster hosting
diverse (i.e.multi-tenant)  workloads is a common scenario in production
for serious users. The ability to auth a framework & assign roles would be
a fairly basic ask, one would imagine. Is the lack of time / effort a
constraint? If so, I'd be glad to help (as mentioned in the jira).

On Fri, May 15, 2015 at 5:29 PM, Iulian Dragoș 
wrote:

> Hi Ankur,
>
> Just to add a thought to Tim's excellent answer, Spark on Mesos is very
> important to us and is the recommended deployment for our customers as
> Typesafe.
>
> Thanks for pointing to your PR, I see Tim already went through a round of
> reviews. It seems very useful, I'll give it a try as well.
>
> thanks,
> iulian
>
>
>
> On Fri, May 15, 2015 at 9:53 AM, Ankur Chauhan  wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA1
>>
>> Hi Tim,
>>
>> Thanks for such a detailed email. I am excited to hear about the new
>> features, I had a pull request going for adding "attribute based
>> filtering in the mesos scheduler" but it hasn't received much love -
>> https://github.com/apache/spark/pull/5563 . I am a fan of
>> mesos/marathon/mesosphere and spark ecosystems and trying to push
>> adoption at my workplace.
>>
>> It would love to see documentation, tutorials (anything actually) that
>> would make mesos + spark a better and more fleshed out solution. Would
>> it be possible for you to share some links to the JIRA and pull
>> requests so that I can keep track on the progress/features.
>>
>> Again, thanks for replying.
>>
>> - -- Ankur Chauhan
>>
>> On 15/05/2015 00:39, Tim Chen wrote:
>> > Hi Ankur,
>> >
>> > This is a great question as I've heard similar concerns about Spark
>> > on Mesos.
>> >
>> > At the time when I started to contribute to Spark on Mesos approx
>> > half year ago, the Mesos scheduler and related code hasn't really
>> > got much attention from anyone and it was pretty much in
>> > maintenance mode.
>> >
>> > As a Mesos PMC that is really interested in Spark I started to
>> > refactor and check out different JIRAs and PRs around the Mesos
>> > scheduler, and after that started to fix various bugs in Spark,
>> > added documentation and also in fix related Mesos issues as well.
>> >
>> > Just recently for 1.4 we've merged in Cluster mode and Docker
>> > support, and there are also pending PRs around framework
>> > authentication, multi-role support, dynamic allocation, more finer
>> > tuned coarse grain mode scheduling configurations, etc.
>> >
>> > And finally just want to mention that Mesosphere and Typesafe is
>> > collaborating to bring a certified distribution
>> > (https://databricks.com/spark/certification/certified-spark-distributi
>> on)
>> > of Spark on Mesos and DCOS, and we will be pouring resources into
>> > not just maintain Spark on Mesos but drive more features into the
>> > Mesos scheduler and also in Mesos so stateful services can leverage
>> > new APIs and features to make better scheduling decisions and
>> > optimizations.
>> >
>> > I don't have a solidified roadmap to share yet, but we will be
>> > discussing this and hopefully can share with the community soon.
>> >
>> > In summary Spark on Mesos is not dead or in maintenance mode, and
>> > look forward to see a lot more changes from us and the community.
>> >
>> > Tim
>> >
>> > On Thu, May 14, 2015 at 11:30 PM, Ankur Chauhan
>> > mailto:an...@malloc64.com>> wrote:
>> >
>> > Hi,
>> >
>> > This is both a survey type as well as a roadmap query question. It
>> > seems like of the cluster options to run spark (i.e. via YARN and
>> > Mesos), YARN seems to be getting a lot more attention and patches
>> > when compared to Mesos.
>> >
>> > Would it be correct to assume that spark on mesos is more or less
>> > a dead or something like a maintenance-only feature and YARN is
>> > the recommended way to go?
>> >
>> > What is the roadmap for spark on mesos? and what is the roadmap
>> > for spark on yarn. I like mesos so as much as I would like to see
>> > it thrive I don't think spark community is active (or maybe it
>> > just appears that way).
>> >
>> > Another more community oriented question: what do most people use
>> > to run spark in production or more-than-POC products? Why did you
>> > make that decision?
>> >
>> > There was a similar post form early 2014 where Metei answered that
>> > mesos and yarn were equally important, but has this changed as
>> > spark has now reached almost 1.4.0 stage?
>> >
>> > -- Ankur Chauhan
>> >
>> > -
>> >
>> >
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >  For additional commands,
>> > e-mail: user-h...@spark.apache.org
>> > 
>> >
>> >
>> -BEGIN PGP SIGNATURE-

Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
Hmm...all files under the event log folder has permission 770 but strangely
my account cannot read other user's files. Permission denied.

I'll sort it out with our Hadoop admin. Thanks for then help!

Jianshi

On Thu, May 28, 2015 at 12:13 AM, Marcelo Vanzin 
wrote:

> Then:
> - Are all files readable by the user running the history server?
> - Did all applications call sc.stop() correctly (i.e. files do not have
> the ".inprogress" suffix)?
>
> Other than that, always look at the logs first, looking for any errors
> that may be thrown.
>
>
> On Wed, May 27, 2015 at 9:10 AM, Jianshi Huang 
> wrote:
>
>> Yes, all written to the same directory on HDFS.
>>
>> Jianshi
>>
>> On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin 
>> wrote:
>>
>>> You may be the only one not seeing all the logs. Are you sure all the
>>> users are writing to the same log directory? The HS can only read from a
>>> single log directory.
>>>
>>> On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang 
>>> wrote:
>>>
 No one using History server? :)

 Am I the only one need to see all user's logs?

 Jianshi

 On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang >>> > wrote:

> Hi,
>
> I'm using Spark 1.4.0-rc1 and I'm using default settings for history
> server.
>
> But I can only see my own logs. Is it possible to view all user's
> logs? The permission is fine for the user group.
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github & Blog: http://huangjs.github.com/

>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Marcelo
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
BTW, is there an option to set file permission for spark event logs?

Jianshi

On Thu, May 28, 2015 at 11:25 AM, Jianshi Huang 
wrote:

> Hmm...all files under the event log folder has permission 770 but
> strangely my account cannot read other user's files. Permission denied.
>
> I'll sort it out with our Hadoop admin. Thanks for then help!
>
> Jianshi
>
> On Thu, May 28, 2015 at 12:13 AM, Marcelo Vanzin 
> wrote:
>
>> Then:
>> - Are all files readable by the user running the history server?
>> - Did all applications call sc.stop() correctly (i.e. files do not have
>> the ".inprogress" suffix)?
>>
>> Other than that, always look at the logs first, looking for any errors
>> that may be thrown.
>>
>>
>> On Wed, May 27, 2015 at 9:10 AM, Jianshi Huang 
>> wrote:
>>
>>> Yes, all written to the same directory on HDFS.
>>>
>>> Jianshi
>>>
>>> On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin 
>>> wrote:
>>>
 You may be the only one not seeing all the logs. Are you sure all the
 users are writing to the same log directory? The HS can only read from a
 single log directory.

 On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang >>> > wrote:

> No one using History server? :)
>
> Am I the only one need to see all user's logs?
>
> Jianshi
>
> On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang <
> jianshi.hu...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using Spark 1.4.0-rc1 and I'm using default settings for history
>> server.
>>
>> But I can only see my own logs. Is it possible to view all user's
>> logs? The permission is fine for the user group.
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



 --
 Marcelo

>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Marcelo
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Dmitry Goldenberg
Thanks, Rajesh.  I think that acquring/relinquishing executors is important
but I feel like there are at least two layers for resource allocation and
autoscaling.  It seems that acquiring and relinquishing executors is a way
to optimize resource utilization within a pre-set Spark cluster of machines.

However, to accommodate for big spikes in input data, we also need the
actual cluster scaling, i.e. adding (or removing, when no longer needed)
worker node machines automatically.  On that front, I wonder how Spark
reacts to a machine being added or removed and what the actual procedure
would be.  If we're running on a Hadoop cluster, there's a description of
adding a node

there.  There's also discussions of Hadoop node adding/removal such as this
one

.

My worry is, will Spark "gracefully" and "quickly" detect the presence of a
new node and start utilizing it (i.e. how much does it communicate with the
Hadoop cluster manager?)...  By the same token, if a node is removed, how
can it be removed gracefully so as not to affect/kill any running Spark
jobs?

On Wed, May 27, 2015 at 10:57 PM,  wrote:

> *Dell - Internal Use - Confidential *
>
> Did you check
> https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit and
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Not sure if the spark kafka receiver emits metrics on the lag, check this
>  link out
> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
>
>
>
> You should be able to whip up a script that runs the Kafka
> ConsumerOffsetChecker periodically and pipe it to a metrics backend of your
> choice. Based on this you can work the dynamic resource allocation magic.
>
> -Original Message-
> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
> Sent: Wednesday, May 27, 2015 6:21 PM
> To: user@spark.apache.org
> Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in
> Kafka or Spark's metrics?
>
> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling
> Spark (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing. What are some of the ways to generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone
> has mentioned Mesos...
>
> I see some info on Spark metrics in the Spark monitoring guide . Do we
> want to perhaps implement a custom sink that would help us autoscale up or
> down based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>


RE: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread Cheng, Hao
Yes, but be sure you put the hive-site.xml under your class path.

Any problem you meet?

Cheng Hao

From: Sanjay Subramanian [mailto:sanjaysubraman...@yahoo.com.INVALID]
Sent: Thursday, May 28, 2015 8:53 AM
To: user
Subject: Pointing SparkSQL to existing Hive Metadata with data file locations 
in HDFS

hey guys

On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.
The data is stored an text (moving slowly to Parquet) on HDFS.
I want to use SparkSQL and point to the Hive metadata and be able to define 
JOINS etc using a programming structure like this

import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
val schemaRdd = sqlContext.sql("some complex SQL")


Is that the way to go ? Some guidance will be great.

thanks

sanjay




RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Rajesh_Kalluri
Dell - Internal Use - Confidential
Did you check https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit 
and 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

Not sure if the spark kafka receiver emits metrics on the lag, check this  link 
out 
http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer

You should be able to whip up a script that runs the Kafka 
ConsumerOffsetChecker periodically and pipe it to a metrics backend of your 
choice. Based on this you can work the dynamic resource allocation magic.

-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
Sent: Wednesday, May 27, 2015 6:21 PM
To: user@spark.apache.org
Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka 
or Spark's metrics?

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark 
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we stream 
data from would start growing. What are some of the ways to generate the 
metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with 
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has 
mentioned Mesos...

I see some info on Spark metrics in the Spark monitoring guide . Do we want to 
perhaps implement a custom sink that would help us autoscale up or down based 
on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


Re: SparkR Jobs Hanging in collectPartitions

2015-05-27 Thread Shivaram Venkataraman
Could you try to see which phase is causing the hang ? i.e. If you do a
count() after flatMap does that work correctly ? My guess is that the hang
is somehow related to data not fitting in the R process memory but its hard
to say without more diagnostic information.

Thanks
Shivaram

On Tue, May 26, 2015 at 7:28 AM, Eskilson,Aleksander <
alek.eskil...@cerner.com> wrote:

>  I’ve been attempting to run a SparkR translation of a similar Scala job
> that identifies words from a corpus not existing in a newline delimited
> dictionary. The R code is:
>
>  dict <- SparkR:::textFile(sc, src1)
> corpus <- SparkR:::textFile(sc, src2)
> words <- distinct(SparkR:::flatMap(corpus, function(line) {
> gsub(“[[:punct:]]”, “”, tolower(strsplit(line, “ |,|-“)[[1]]))}))
> found <- subtract(words, dict)
>
>  (where src1, src2 are locations on HDFS)
>
>  Then attempting something like take(found, 10) or saveAsTextFile(found,
> dest) should realize the collection, but that stage of the DAG hangs in
> Scheduler Delay during the collectPartitions phase.
>
>  Synonymous Scala code however,
> val corpus = sc.textFile(src1).flatMap(_.split(“ |,|-“))
> val dict = sc.textFile(src2)
> val words = corpus.map(word =>
> word.filter(Character.isLetter(_))).disctinct()
> val found = words.subtract(dict)
>
>  performs as expected. Any thoughts?
>
>  Thanks,
> Alek Eskilson
>  CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>


Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-27 Thread Sanjay Subramanian
hey guys
On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.The data is stored an text (moving slowly to 
Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be 
able to define JOINS etc using a programming structure like this 
import org.apache.spark.sql.hive.HiveContextval sqlContext = new 
HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL")

Is that the way to go ? Some guidance will be great.
thanks
sanjay 




Re: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-27 Thread Yana Kadiyska
What does your master log say -- normally the master should NEVER shut
down...you should be able to spark-submit to infinity with no issues...So
the question about high variance on upstart is one issue, but the other
thing that's puzzling to me is why your master is ever down to begin
with...(assuming you're not manually restarting it and I missed that part).

One thing that does occur to me, perhaps since you're running on the same
box -- is it possible that the UNIX OOM killer is getting your process --
we had that occur with a long running driver a few times. It's a nasty one
as it leaves no good trace in the spark logs, you have to know to look at
the system logs.

If you're running spark-submit by hand, you can verify via the UI that your
master is up and has a worker connected prior to submitting...if it's not
investigate why it went down...

Lastly a random thought -- your "memory per Node" shows up at 512MB which
seems really really low on the executor side. I don't know how much memory
you have on that machine, but normally the executors do all the work- I'd
try to give it a few G if you can. Your worker shows 15G of memory, I'd
give your executor at least 4...

On Wed, May 27, 2015 at 4:06 PM, Stephen Boesch  wrote:

> Here is example after git clone-ing latest 1.4.0-SNAPSHOT.  The first 3
> runs (FINISHED) were successful and connected quickly.  Fourth run (ALIVE)
> is failing on connection/association.
>
>
> URL: spark://mellyrn.local:7077
> REST URL: spark://mellyrn.local:6066 (cluster mode)
> Workers: 1
> Cores: 8 Total, 0 Used
> Memory: 15.0 GB Total, 0.0 B Used
> Applications: 0 Running, 3 Completed
> Drivers: 0 Running, 0 Completed
> Status: ALIVE
> Workers
>
> Worker Id Address ▾ State Cores Memory
> worker-20150527122155-10.0.0.3-60847 10.0.0.3:60847 ALIVE 8 (0 Used) 15.0
> GB (0.0 B Used)
> Running Applications
>
> Application ID Name Cores Memory per Node Submitted Time User State
> Duration
> Completed Applications
>
> Application ID Name Cores Memory per Node Submitted Time User State
> Duration
> app-20150527125945-0002 TestRunner: power-iteration-clustering 8 512.0 MB 
> 2015/05/27
> 12:59:45 steve FINISHED 7 s
> app-20150527124403-0001 TestRunner: power-iteration-clustering 8 512.0 MB 
> 2015/05/27
> 12:44:03 steve FINISHED 6 s
> app-20150527123822- TestRunner: power-iteration-clustering 8 512.0 MB 
> 2015/05/27
> 12:38:22 steve FINISHED 6 s
>
>
>
> 2015-05-27 11:42 GMT-07:00 Stephen Boesch :
>
> Thanks Yana,
>>
>>My current experience here is after running some small spark-submit
>> based tests the Master once again stopped being reachable.  No change in
>> the test setup.  I restarted Master/Worker and still not reachable.
>>
>> What might be the variables here in which association with the
>> Master/Worker stops succeedng?
>>
>> For reference here are the Master/worker
>>
>>
>>   501 34465 1   0 11:35AM ?? 0:06.50
>> /Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
>> -cp  -Xms512m -Xmx512m -XX:MaxPermSize=128m
>> org.apache.spark.deploy.worker.Worker spark://mellyrn.local:7077
>>   501 34361 1   0 11:35AM ttys0180:07.08
>> /Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
>> -cp   -Xms512m -Xmx512m -XX:MaxPermSize=128m
>> org.apache.spark.deploy.master.Master --ip mellyrn.local --port 7077
>> --webui-port 8080
>>
>>
>> 15/05/27 11:36:37 INFO SparkUI: Started SparkUI at
>> http://25.101.19.24:4040
>> 15/05/27 11:36:37 INFO SparkContext: Added JAR
>> file:/shared/spark-perf/mllib-tests/target/mllib-perf-tests-assembly.jar at
>> http://25.101.19.24:60329/jars/mllib-perf-tests-assembly.jar with
>> timestamp 1432751797662
>> 15/05/27 11:36:37 INFO AppClient$ClientActor: Connecting to master
>> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
>> 15/05/27 11:36:37 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@mellyrn.local:7077:
>> akka.remote.InvalidAssociation: Invalid address:
>> akka.tcp://sparkMaster@mellyrn.local:7077
>> 15/05/27 11:36:37 WARN Remoting: Tried to associate with unreachable
>> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
>> now gated for 5000 ms, all messages to this address will be delivered to
>> dead letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
>> 15/05/27 11:36:57 INFO AppClient$ClientActor: Connecting to master
>> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
>> 15/05/27 11:36:57 WARN AppClient$ClientActor: Could not connect to
>> akka.tcp://sparkMaster@mellyrn.local:7077:
>> akka.remote.InvalidAssociation: Invalid address:
>> akka.tcp://sparkMaster@mellyrn.local:7077
>> 15/05/27 11:36:57 WARN Remoting: Tried to associate with unreachable
>> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
>> now gated for 5000 ms, all messages to this address will be delivered to
>> dead letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
>> 15/05/27 11:37:17 INFO A

Value for SPARK_EXECUTOR_CORES

2015-05-27 Thread Mulugeta Mammo
My executor has the following spec (lscpu):

CPU(s): 16
Core(s) per socket: 4
Socket(s): 2
Thread(s) per code: 2

The CPU count is obviously 4*2*2 = 16. My question is what value is Spark
expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores
(2 * 2 = 4) ?

Thanks


Re: How does spark manage the memory of executor with multiple tasks

2015-05-27 Thread canan chen
Thanks Yong, this is very helpful. And found ShuffleMemoryManager which is
used to allocate memory across tasks in one executor.

>> These 2 tasks have to share the 2G heap memory. I don't think specifying
the memory per task is a good idea, as task is running in the Thread level,
and Memory only apply for the JVM processor.

Yes, it's not a good idea to specify memory per task in spark due to that
it can run multiple tasks in one executor. Actually this is what confuse
me. In spark, I can only specify total cores + memory per executor for each
app,
Based on this I can calculate how many cores/tasks per executor and then
estimate how much memory each task can consume on average.  I am not sure
whether this is a good idea to that in this way. Because to users, they
think the job on the task level and consider tasks are independent and
isolated. But at runtime, the tasks may run in one JVM and share the memory
they specify per executor. This is confused to me at least.



On Wed, May 27, 2015 at 9:06 PM, java8964  wrote:

> Same as you, there are lots of people coming from MapReduce world, and try
> to understand the internals of Spark. Hope below can help you some way.
>
> For the end users, they only have concept of Job. I want to run a word
> count job from this one big file, that is the job I want to run. How many
> stages and tasks this job will generate depends on the file size and
> parallelism you specify in your job.
>
> For word count, it will generate 2 stages, as we have shuffle in it,
> thinking it the same way as Mapper and Reducer part.
>
> If the file is 1280M size in HDFS with 128M block, so the first stage will
> generate 10 tasks. If you use the default parallelism in spark, the 2nd
> stage should generate 200 tasks.
>
> Forget about Executors right now, so the above job will have 210 tasks to
> run. In the standalone mode, you need to specify the cores and memory for
> your job. Let's assume you have 5 worker nodes with 4 cores + 8G each. Now,
> if you ask 10 cores and 2G per executor, and cluster does have the enough
> resources available, then you will get 1 executor from each work node, with
> 2 cores + 2G per executor to run your job.
> In this case, first 10 tasks in the stage one can start concurrently at
> the same time, after that, every 10 tasks in stage 2 can be run
> concurrently. You get 5 executors, as you have 5 worker nodes. There is a
> coming feature to start multi executors per worker, but we are talking
> about the normally case here. In fact, you can start multi workers in one
> physical box, if you have enough resource.
>
> In the above case, 2 tasks will be run concurrently per executor. You
> control this by specify how many cores you want for your job, plus how many
> workers in your cluster as pre configured. These 2 tasks have to share the
> 2G heap memory. I don't think specifying the memory per task is a good
> idea, as task is running in the Thread level, and Memory only apply for the
> JVM processor.
>
> In MR, every mapper and reducer match to a java processing, but in spark,
> the task is just matching with a thread/core.
>
> In Spark, memory tuning is more like an art, but still have lot of rules
> to follow. In the above case, you can increase the parallelism to 400, then
> you will have 400 tasks in the stage 2, so each task will come with less
> data, provided you have much large unique words in the file. Or you can
> lower the cores from 10 to 5, then each executor will only process one task
> at a time, but your job will run slower.
>
> Overall, you want to max the parallelism to gain the best speed, but also
> make sure the memory is enough for your job at this speed, to avoid OOM. It
> is a balance.
>
> Keep in mind:
>
>- Cluster pre-config with number of workers with total cores + max
>heap memory you can ask
>- Per application, you specify total cores you want + heap memory per
>executor
>- In your application, you can specify the parallelism level, as lots
>of "Action" supporting it. So parallelism is dynamic, from job to job, or
>even from stage to stage.
>
>
> Yong
>
> --
> Date: Wed, 27 May 2015 15:48:57 +0800
> Subject: Re: How does spark manage the memory of executor with multiple
> tasks
> From: ccn...@gmail.com
> To: evo.efti...@isecc.com
> CC: ar...@sigmoidanalytics.com; user@spark.apache.org
>
>
> Does anyone can answer my question ? I am curious to know if there's
> multiple reducer tasks in one executor, how to allocate memory between
> these reducers tasks since each shuffle will consume a lot of memory ?
>
> On Tue, May 26, 2015 at 7:27 PM, Evo Eftimov 
> wrote:
>
>  the link you sent says multiple executors per node
>
> Worker is just demon process launching Executors / JVMs so it can execute
> tasks - it does that by cooperating with the master and the driver
>
> There is a one to one maping between Executor and JVM
>
>
> Sent from Samsung Mobile
>
>
>  Original messag

Re: Multilabel classification using logistic regression

2015-05-27 Thread Peter Garbers
Hi Joseph,

I looked at that but it seems that LogisticRegressionWithLBFGS's run
method takes RDD[LabeledPoint] objects so I'm not sure it's exactly
how one would use it in the way I think you're describing

On Wed, May 27, 2015 at 4:04 PM, Joseph Bradley  wrote:
> It looks like you are training each model i (for label i) by only using data
> with label i.  You need to use all of your data to train each model so the
> models can compare each label i with the other labels (roughly speaking).
>
> However, what you're doing is multiclass (not multilabel) classification,
> which LogisticRegressionWithLBFGS already supports.  Can you not just use
> LogisticRegressionWithLBFGS directly?
>
> On Wed, May 27, 2015 at 8:53 AM, peterg  wrote:
>>
>> Hi all
>>
>> I believe I have created a multi-label classifier using LogisticRegression
>> but there is one snag. No matter what features I use to get the
>> prediction,
>> it will always return the label. I feel like I need to set a threshold but
>> can't seem to figure out how to do that. I attached the code below. It's
>> super simple. Hopefully someone can point me in the correct :
>>
>> val labels = labeledPoints.map(l => l.label).take(1000).distinct // stupid
>> hack
>> val groupedRDDs = labels.map { l => labeledPoints.filter (m => m.label ==
>> l)
>> }.map(l => l.cache()) // should use groupBy
>> val models = groupedRDDs.map(rdd => new
>> LogisticRegressionWithLBFGS().setNumClasses(101).run(rdd))
>> val results = models.map(m => m.predict(Vectors.dense(query.features)))
>>
>> Thanks
>>
>> Peter
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Multilabel-classification-using-logistic-regression-tp23054.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread Dmitry Goldenberg
Got it, thank you, Tathagata and Ted.

Could you comment on my other question

as well?  Basically, I'm trying to get a handle on a good approach to
throttling, on the one hand side, and autoscaling the cluster, on the
other.  Are there any recommended approaches or design patterns for
autoscaling that you have implemented or could point me at? Thanks!

On Wed, May 27, 2015 at 8:08 PM, Tathagata Das  wrote:

> You can throttle the no receiver direct Kafka stream using
> spark.streaming.kafka.maxRatePerPartition
> 
>
>
> On Wed, May 27, 2015 at 4:34 PM, Ted Yu  wrote:
>
>> Have you seen
>> http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application
>> ?
>>
>> Cheers
>>
>> On Wed, May 27, 2015 at 4:11 PM, dgoldenberg 
>> wrote:
>>
>>> Hi,
>>>
>>> With the no receivers approach to streaming from Kafka, is there a way to
>>> set something like spark.streaming.receiver.maxRate so as not to
>>> overwhelm
>>> the Spark consumers?
>>>
>>> What would be some of the ways to throttle the streamed messages so that
>>> the
>>> consumers don't run out of memory?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread Tathagata Das
You can throttle the no receiver direct Kafka stream using
spark.streaming.kafka.maxRatePerPartition



On Wed, May 27, 2015 at 4:34 PM, Ted Yu  wrote:

> Have you seen
> http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application
> ?
>
> Cheers
>
> On Wed, May 27, 2015 at 4:11 PM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> With the no receivers approach to streaming from Kafka, is there a way to
>> set something like spark.streaming.receiver.maxRate so as not to overwhelm
>> the Spark consumers?
>>
>> What would be some of the ways to throttle the streamed messages so that
>> the
>> consumers don't run out of memory?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread Ted Yu
Have you seen
http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application
?

Cheers

On Wed, May 27, 2015 at 4:11 PM, dgoldenberg 
wrote:

> Hi,
>
> With the no receivers approach to streaming from Kafka, is there a way to
> set something like spark.streaming.receiver.maxRate so as not to overwhelm
> the Spark consumers?
>
> What would be some of the ways to throttle the streamed messages so that
> the
> consumers don't run out of memory?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread dgoldenberg
Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up? 
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread dgoldenberg
Hi,

With the no receivers approach to streaming from Kafka, is there a way to
set something like spark.streaming.receiver.maxRate so as not to overwhelm
the Spark consumers?

What would be some of the ways to throttle the streamed messages so that the
consumers don't run out of memory?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SF / East Bay Area Stream Processing Meetup next Thursday (6/4)

2015-05-27 Thread Siva Jagadeesan
http://www.meetup.com/Bay-Area-Stream-Processing/events/219086133/

Thursday, June 4, 2015

6:45 PM
TubeMogul


1250 53rd
St #1
Emeryville, CA

6:45PM to 7:00PM - Socializing

7:00PM to 8:00PM - Talks

8:00PM to 8:30PM - Socializing

Speaker :

*Bill Zhao (from TubeMogul)*

Bill was working as a researcher in the UC Berkeley AMP lab during the
creation of Spark and Tachyon, and worked on improving Spark memory
utilization and Spark Tachyon integration.  The AMP lab Working at the
intersection of three massive trends: powerful machine learning, cloud
computing, and crowdsourcing, the AMPLab is integrating Algorithms,
Machines, and People to make sense of Big Data.

Topic:

*Introduction to Spark and Tachyon*

Description:

Spark is a fast and general processing engine compatible with Hadoop data.
It can run in Hadoop clusters through YARN or Spark's standalone mode, and
it can process data in HDFS, etc.  It is designed to perform both batch
processing (similar to MapReduce).  Tachyon is a memory-centric distributed
storage system enabling reliable data sharing at memory-speed across
cluster frameworks, such as Spark and MapReduce.  It achieves high
performance by leveraging lineage information and using memory
aggressively. Tachyon caches working set files in memory, thereby avoiding
going to disk to load datasets that are frequently read. This enables
different jobs/queries and frameworks to access cached files at memory
speed.


Re: How to get the best performance with LogisticRegressionWithSGD?

2015-05-27 Thread Joseph Bradley
The model is learned using an iterative convex optimization algorithm.
 "numIterations," "stepSize" and "miniBatchFraction" are for those; you can
see details here:
http://spark.apache.org/docs/latest/mllib-linear-methods.html#implementation-developer
http://spark.apache.org/docs/latest/mllib-optimization.html

I would set miniBatchFraction at 1.0 and not mess with it.
For LogisticRegressionWithSGD, to know whether you have the other 2
parameters set correctly, you should try running with more iterations.
If running with more iterations changes your result significantly, then:
 - If the result is blowing up (really big model weights), then you need to
decrease stepSize.
 - If the result is not blowing up but keeps changing, then you need to
increase numIterations.

You should not need to set initialWeights, but it can help if you have some
estimate already calculated.

If you have access to a build of the current Spark master (or can wait for
1.4), then the org.apache.spark.ml.classification.LogisticRegression
implementation has been compared with R and should get very similar results.

Good luck!
Joseph

On Wed, May 27, 2015 at 8:22 AM, SparknewUser 
wrote:

> I'm new to Spark and I'm getting bad performance with classification
> methods
> on Spark MLlib (worse than R in terms of AUC).
> I am trying to put my own parameters rather than the default parameters.
> Here is the method I want to use :
> train(RDD input,
> int numIterations,
>   double stepSize,
>  double miniBatchFraction,
> Vector initialWeights)
> How to choose "numIterations" and "stepSize"?
> What does miniBatchFraction mean?
> Is initialWeights necessary to have a good model? Then, how to choose them?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-best-performance-with-LogisticRegressionWithSGD-tp23053.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: debug jsonRDD problem?

2015-05-27 Thread Ted Yu
Looks like the exception was caused by resolved.get(prefix ++ a) returning
None :
a => StructField(a.head, resolved.get(prefix ++ a).get, nullable =
true)

There are three occurrences of resolved.get() in createSchema() - None
should be better handled in these places.

My two cents.

On Wed, May 27, 2015 at 1:46 PM, Michael Stone  wrote:

> On Wed, May 27, 2015 at 01:13:43PM -0700, Ted Yu wrote:
>
>> Can you tell us a bit more about (schema of) your JSON ?
>>
>
> It's fairly simple, consisting of 22 fields with values that are mostly
> strings or integers, except that some of the fields are objects
> with http header/value pairs. I'd guess it's something in those latter
> fields that is causing the problems. The data is 800M rows that I didn't
> create in the first place and I'm in the process of making a simpler test
> case. What I was mostly wondering is if there were an obvious mechanism
> that I'm just missing to get jsonRDD to spit out more information about
> which specific rows it's having problems with.
>
>  You can find sample JSON
>> in sql/core/src/test//scala/org/apache/spark/sql/json/
>> TestJsonData.scala
>>
>
> I know the jsonRDD works in general, I've used it before without problems.
> It even works on subsets of this data.
> Mike Stone
>


Re: debug jsonRDD problem?

2015-05-27 Thread Michael Stone

On Wed, May 27, 2015 at 01:13:43PM -0700, Ted Yu wrote:

Can you tell us a bit more about (schema of) your JSON ?


It's fairly simple, consisting of 22 fields with values that are mostly 
strings or integers, except that some of the fields are objects
with http header/value pairs. I'd guess it's something in those latter 
fields that is causing the problems. The data is 800M rows that I didn't 
create in the first place and I'm in the process of making a simpler 
test case. What I was mostly wondering is if there were an obvious 
mechanism that I'm just missing to get jsonRDD to spit out more 
information about which specific rows it's having problems with.



You can find sample JSON in sql/core/src/test//scala/org/apache/spark/sql/json/
TestJsonData.scala


I know the jsonRDD works in general, I've used it before without 
problems. It even works on subsets of this data. 


Mike Stone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: debug jsonRDD problem?

2015-05-27 Thread Ted Yu
Can you tell us a bit more about (schema of) your JSON ?

You can find sample JSON
in sql/core/src/test//scala/org/apache/spark/sql/json/TestJsonData.scala

Cheers

On Wed, May 27, 2015 at 12:33 PM, Michael Stone  wrote:

> Can anyone provide some suggestions on how to debug this? Using spark
> 1.3.1. The json itself seems to be valid (other programs can parse it) and
> the problem seems to lie in jsonRDD trying to describe & use a schema.
>
> scala> sqlContext.jsonRDD(rdd).count()
> java.util.NoSuchElementException: None.get
>at scala.None$.get(Option.scala:313)
>at scala.None$.get(Option.scala:311)
>at
> org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:105)
>at
> org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:101)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>at
> org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$makeStruct$1(JsonRDD.scala:101)
>at
> org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:104)
>at
> org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:101)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>at
> org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$makeStruct$1(JsonRDD.scala:101)
>at
> org.apache.spark.sql.json.JsonRDD$.createSchema(JsonRDD.scala:132)
>at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:56)
>at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:635)
>at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:581)
>[...]
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-27 Thread Stephen Boesch
Here is example after git clone-ing latest 1.4.0-SNAPSHOT.  The first 3
runs (FINISHED) were successful and connected quickly.  Fourth run (ALIVE)
is failing on connection/association.


URL: spark://mellyrn.local:7077
REST URL: spark://mellyrn.local:6066 (cluster mode)
Workers: 1
Cores: 8 Total, 0 Used
Memory: 15.0 GB Total, 0.0 B Used
Applications: 0 Running, 3 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Workers

Worker Id Address ▾ State Cores Memory
worker-20150527122155-10.0.0.3-60847 10.0.0.3:60847 ALIVE 8 (0 Used) 15.0
GB (0.0 B Used)
Running Applications

Application ID Name Cores Memory per Node Submitted Time User State Duration
Completed Applications

Application ID Name Cores Memory per Node Submitted Time User State Duration
app-20150527125945-0002 TestRunner: power-iteration-clustering 8 512.0
MB 2015/05/27
12:59:45 steve FINISHED 7 s
app-20150527124403-0001 TestRunner: power-iteration-clustering 8 512.0
MB 2015/05/27
12:44:03 steve FINISHED 6 s
app-20150527123822- TestRunner: power-iteration-clustering 8 512.0
MB 2015/05/27
12:38:22 steve FINISHED 6 s



2015-05-27 11:42 GMT-07:00 Stephen Boesch :

> Thanks Yana,
>
>My current experience here is after running some small spark-submit
> based tests the Master once again stopped being reachable.  No change in
> the test setup.  I restarted Master/Worker and still not reachable.
>
> What might be the variables here in which association with the
> Master/Worker stops succeedng?
>
> For reference here are the Master/worker
>
>
>   501 34465 1   0 11:35AM ?? 0:06.50
> /Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
> -cp  -Xms512m -Xmx512m -XX:MaxPermSize=128m
> org.apache.spark.deploy.worker.Worker spark://mellyrn.local:7077
>   501 34361 1   0 11:35AM ttys0180:07.08
> /Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
> -cp   -Xms512m -Xmx512m -XX:MaxPermSize=128m
> org.apache.spark.deploy.master.Master --ip mellyrn.local --port 7077
> --webui-port 8080
>
>
> 15/05/27 11:36:37 INFO SparkUI: Started SparkUI at
> http://25.101.19.24:4040
> 15/05/27 11:36:37 INFO SparkContext: Added JAR
> file:/shared/spark-perf/mllib-tests/target/mllib-perf-tests-assembly.jar at
> http://25.101.19.24:60329/jars/mllib-perf-tests-assembly.jar with
> timestamp 1432751797662
> 15/05/27 11:36:37 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
> 15/05/27 11:36:37 WARN AppClient$ClientActor: Could not connect to
> akka.tcp://sparkMaster@mellyrn.local:7077:
> akka.remote.InvalidAssociation: Invalid address:
> akka.tcp://sparkMaster@mellyrn.local:7077
> 15/05/27 11:36:37 WARN Remoting: Tried to associate with unreachable
> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
> now gated for 5000 ms, all messages to this address will be delivered to
> dead letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
> 15/05/27 11:36:57 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
> 15/05/27 11:36:57 WARN AppClient$ClientActor: Could not connect to
> akka.tcp://sparkMaster@mellyrn.local:7077:
> akka.remote.InvalidAssociation: Invalid address:
> akka.tcp://sparkMaster@mellyrn.local:7077
> 15/05/27 11:36:57 WARN Remoting: Tried to associate with unreachable
> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
> now gated for 5000 ms, all messages to this address will be delivered to
> dead letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
> 15/05/27 11:37:17 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
> 15/05/27 11:37:17 WARN AppClient$ClientActor: Could not connect to
> akka.tcp://sparkMaster@mellyrn.local:7077:
> akka.remote.InvalidAssociation: Invalid address:
> akka.tcp://sparkMaster@mellyrn.local:7077
> 15/05/27 11:37:17 WARN Remoting: Tried to associate with unreachable
> remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is
> now gated for 5000 ms, all messages to this address will be delivered to
> dead letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
> 15/05/27 11:37:37 ERROR SparkDeploySchedulerBackend: Application has been
> killed. Reason: All masters are unresponsive! Giving up.
> 15/05/27 11:37:37 WARN SparkDeploySchedulerBackend: Application ID is not
> initialized yet.
> 1
>
>
> Even when successful, the time for the Master to come up has a
> surprisingly high variance. I am running on a single machine for which
> there is plenty of RAM. Note that was one problem before the present series
> :  if RAM is tight then the failure modes can be unpredictable. But now the
> RAM is not an issue: plenty available for both Master and Worker.
>
> Within the same hour period and starting/stopping maybe a dozen times, the
> startup time for the Master may be a few seconds up to  a couple to several
> minut

Re: Multilabel classification using logistic regression

2015-05-27 Thread Joseph Bradley
It looks like you are training each model i (for label i) by only using
data with label i.  You need to use all of your data to train each model so
the models can compare each label i with the other labels (roughly
speaking).

However, what you're doing is multiclass (not multilabel) classification,
which LogisticRegressionWithLBFGS already supports.  Can you not just use
LogisticRegressionWithLBFGS directly?

On Wed, May 27, 2015 at 8:53 AM, peterg  wrote:

> Hi all
>
> I believe I have created a multi-label classifier using LogisticRegression
> but there is one snag. No matter what features I use to get the prediction,
> it will always return the label. I feel like I need to set a threshold but
> can't seem to figure out how to do that. I attached the code below. It's
> super simple. Hopefully someone can point me in the correct :
>
> val labels = labeledPoints.map(l => l.label).take(1000).distinct // stupid
> hack
> val groupedRDDs = labels.map { l => labeledPoints.filter (m => m.label ==
> l)
> }.map(l => l.cache()) // should use groupBy
> val models = groupedRDDs.map(rdd => new
> LogisticRegressionWithLBFGS().setNumClasses(101).run(rdd))
> val results = models.map(m => m.predict(Vectors.dense(query.features)))
>
> Thanks
>
> Peter
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Multilabel-classification-using-logistic-regression-tp23054.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RDD boundaries and triggering processing using tags in the data

2015-05-27 Thread David Webber
Hi All,

I'm new to Spark and I'd like some help understanding if a particular use
case would be a good fit for Spark Streaming.

I have an imaginary stream of sensor data consisting of integers 1-10. 
Every time the sensor reads "10" I'd like to average all the numbers that
were received since the last "10"

example input: 10 5 8 4 6 2 1 2 8 8 8 1 6 9 1 3 10 1 3 10 ...
desired output: 4.8, 2.0

I'm confused about what happens if sensor readings fall into different RDDs.  

RDD1:  10 5 8 4 6 2 1 2 8 8 8
RDD2:  1 6 9 1 3 10 1 3 10
output: ???, 2.0

My imaginary sensor doesn't read at fixed time intervals, so breaking the
stream into RDDs by time interval won't ensure the data is packaged
properly.  Additionally, multiple sensors are writing to the same stream
(though I think flatMap can parse the origin stream into streams for
individual sensors, correct?).  

My best guess for processing goes like
1) flatMap() to break out individual sensor streams
2) Custom parser to accumulate input data until "10" is found, then create a
new output RDD for each sensor and data grouping
3) average the values from step 2

I would greatly appreciate pointers to some specific documentation or
examples if you have seen something like this before.

Thanks,
David



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-boundaries-and-triggering-processing-using-tags-in-the-data-tp23060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



debug jsonRDD problem?

2015-05-27 Thread Michael Stone
Can anyone provide some suggestions on how to debug this? Using spark 
1.3.1. The json itself seems to be valid (other programs can parse it) 
and the problem seems to lie in jsonRDD trying to describe & use a 
schema.


scala> sqlContext.jsonRDD(rdd).count()
java.util.NoSuchElementException: None.get
   at scala.None$.get(Option.scala:313)
   at scala.None$.get(Option.scala:311)
   at org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:105)
   at org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:101)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at 
org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$makeStruct$1(JsonRDD.scala:101)
   at org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:104)
   at org.apache.spark.sql.json.JsonRDD$$anonfun$14.apply(JsonRDD.scala:101)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at 
org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$makeStruct$1(JsonRDD.scala:101)
   at org.apache.spark.sql.json.JsonRDD$.createSchema(JsonRDD.scala:132)
   at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:56)
   at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:635)
   at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:581)
   [...]


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Invoking Hive UDF programmatically

2015-05-27 Thread Punyashloka Biswal
Dear Spark users,

Given a DataFrame df with a column named foo bar, I can call a Spark SQL
built-in function on it like so:

df.select(functions.max(df("foo bar")))

However, if I want to apply a Hive UDF named myCustomFunction, I need to
write

df.selectExpr("myCustomFunction(`foo bar`)")

which forces me to deal with escaping the name of the column so I can put
it inside a well-formed SQL query. Is there a programmatic way to invoke a
Hive function by name, so that I don’t have to worry about escaping?
Ideally, I’d like to do something like

val myCustomFunction = functions.udf("myCustomFunction")
df.select(myCustomFunction(df("foo bar")))

… but I couldn’t find any such API.

Regards,

Punya


Re: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-27 Thread Stephen Boesch
Thanks Yana,

   My current experience here is after running some small spark-submit
based tests the Master once again stopped being reachable.  No change in
the test setup.  I restarted Master/Worker and still not reachable.

What might be the variables here in which association with the
Master/Worker stops succeedng?

For reference here are the Master/worker


  501 34465 1   0 11:35AM ?? 0:06.50
/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
-cp  -Xms512m -Xmx512m -XX:MaxPermSize=128m
org.apache.spark.deploy.worker.Worker spark://mellyrn.local:7077
  501 34361 1   0 11:35AM ttys0180:07.08
/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java
-cp   -Xms512m -Xmx512m -XX:MaxPermSize=128m
org.apache.spark.deploy.master.Master --ip mellyrn.local --port 7077
--webui-port 8080


15/05/27 11:36:37 INFO SparkUI: Started SparkUI at http://25.101.19.24:4040
15/05/27 11:36:37 INFO SparkContext: Added JAR
file:/shared/spark-perf/mllib-tests/target/mllib-perf-tests-assembly.jar at
http://25.101.19.24:60329/jars/mllib-perf-tests-assembly.jar with timestamp
1432751797662
15/05/27 11:36:37 INFO AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
15/05/27 11:36:37 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@mellyrn.local:7077: akka.remote.InvalidAssociation:
Invalid address: akka.tcp://sparkMaster@mellyrn.local:7077
15/05/27 11:36:37 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
15/05/27 11:36:57 INFO AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
15/05/27 11:36:57 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@mellyrn.local:7077: akka.remote.InvalidAssociation:
Invalid address: akka.tcp://sparkMaster@mellyrn.local:7077
15/05/27 11:36:57 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
15/05/27 11:37:17 INFO AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...
15/05/27 11:37:17 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@mellyrn.local:7077: akka.remote.InvalidAssociation:
Invalid address: akka.tcp://sparkMaster@mellyrn.local:7077
15/05/27 11:37:17 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: mellyrn.local/25.101.19.24:7077
15/05/27 11:37:37 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
15/05/27 11:37:37 WARN SparkDeploySchedulerBackend: Application ID is not
initialized yet.
1


Even when successful, the time for the Master to come up has a surprisingly
high variance. I am running on a single machine for which there is plenty
of RAM. Note that was one problem before the present series :  if RAM is
tight then the failure modes can be unpredictable. But now the RAM is not
an issue: plenty available for both Master and Worker.

Within the same hour period and starting/stopping maybe a dozen times, the
startup time for the Master may be a few seconds up to  a couple to several
minutes.

2015-05-20 7:39 GMT-07:00 Yana Kadiyska :

> But if I'm reading his email correctly he's saying that:
>
> 1. The master and slave are on the same box (so network hiccups are
> unlikely culprit)
> 2. The failures are intermittent -- i.e program works for a while then
> worker gets disassociated...
>
> Is it possible that the master restarted? We used to have problems like
> this where we'd restart the master process, it won't be listening on 7077
> for some time, but the worker process is trying to connect and by the time
> the master is up the worker has given up...
>
>
> On Wed, May 20, 2015 at 5:16 AM, Evo Eftimov 
> wrote:
>
>> Check whether the name can be resolved in the /etc/hosts file (or DNS) of
>> the worker
>>
>>
>>
>> (the same btw applies for the Node where you run the driver app – all
>> other nodes must be able to resolve its name)
>>
>>
>>
>> *From:* Stephen Boesch [mailto:java...@gmail.com]
>> *Sent:* Wednesday, May 20, 2015 10:07 AM
>> *To:* user
>> *Subject:* Intermittent difficulties for Worker to contact Master on
>> same machine in standalone
>>
>>
>>
>>
>>
>> What conditions would cause the following delays / failure for a
>> standalone machine/cluster to have the Worker contact the Master?
>>
>>
>>
>> 15/05/20 02:02:53 INFO WorkerWebUI: St

Re: Spark and Stanford CoreNLP

2015-05-27 Thread mathewvinoj
Evan,

could you please look into this post.Below is the link.Any thoughts or
suggestion is really appreciated

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-partition-issue-with-Stanford-NLP-td23048.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Stanford-CoreNLP-tp19654p23059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark partition issue with Stanford NLP

2015-05-27 Thread mathewvinoj
Hi There, 

I am trying to process millions of data with spark/scala integrated with
stanford NLP (3.4.1). 
Since I am using social media data I have to use NLP for the themes
generation (pos tagging) and Sentiment calulation. 

I have to deal with Twitter data and NON Twitter data separately.So I have
two classes that deal with Twitter/Non Twitter 

I am using lasy val initialization from each class for loading the
stanfordNLP 

  

features: Seq[String] =
Seq("tokenize","ssplit","pos","parse","sentiment") 
  val props = new Properties() 
  props.put("annotators", features.mkString(", ")) 
  props.put("pos.model", "tagger/gate-EN-twitter.model") 
  props.put("parse.model", "tagger/englishSR.ser.gz"); 
  val pipeline = new StanfordCoreNLP(props) 

Note: For above Twitter I am using different pos model and shift reduce
parse model for parsing. The reason I use shift reduce parser is for some of
the junk 
data at rum time the default PCFG model takes lot of time for processing and
getting some Exception.Shift reduce parser will take around 15 seconds at
load time and its faster at run time while processing the 
data. 

NonTwitter class 


 features: Seq[String] =
Seq("tokenize","ssplit","pos","parse","sentiment") 
val props = new Properties() 
props.put("annotators", features.mkString(", ")) 
props.put("parse.model", "tagger/englishSR.ser.gz"); 

Here I am using the default pos model and shift reduce parser 

Problem: 

Currently we am running with 8 Nodes with 6 cores and I can run with 48
partition. For to process millions of data 
with the above configuratin with lesser partition it works fine for me. 

8 Nodes and 6 cores we have almost 48 partition and if I ran with 42 Number
of partition it takes around 1 hr to finish the processing. 

with the current configuration I need to scale it to 200 partition 

8 Nodes and 6 cores we have almost 48 partition and if we ran with 200
Number of partition it takes around 2 hr and finally throwing some exception
saying the one node is lost 
or java.lang.IllegalArgumentException: annotator "sentiment" requires
annotator "binarized_trees" etc etc. 

The problem is only if we scale up the number of partition to 200 with 8
Nodes and 6 cores which we have only 48 cores. 

I have the suspect that its cuz of loading the shift reduce parser loading
at each partition.i thought of loading this class at one time and then do
the Broadcast but standforndNLP class is not searializable so i cannot
broadcast.any thought suggestion 

The reason we need to scale to 200 partition is it will run quickly with
lesser time to process this data. Any thoughts suggestion is relly helpful



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-partition-issue-with-Stanford-NLP-tp23048p23057.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark partition issue with Stanford NLP

2015-05-27 Thread vishalvibhandik
i am facing a similar issue. when the job runs with partitions > num of cores
then sometimes the executors are getting lost and the job doesnt complete.
is there any additional logging that can be turned on to see the exact cause
of this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-partition-issue-with-Stanford-NLP-tp23048p23055.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hive external metastore connection timeout

2015-05-27 Thread Yana Kadiyska
I have not run into this particular issue but I'm not using latest bits in
production. However, testing your theory should be easy -- MySQL is just a
database, so you should be able to use a regular mysql client and see how
many connections are active. You can then compare to the maximum allowed
connections, and/or up the maximum allowed and see if you still hit the
ceiling. Hopefully someone will have a more direct answer for you...

also, not sure what query you're executing to reconnect, but try something
inexpensive like "show tables" -- I had connection timeout issues with the
actual query when pulling data...that was helped by setting
"hive.metastore.client.socket.timeout" higher...

On Wed, May 27, 2015 at 11:03 AM, jamborta  wrote:

> Hi all,
>
> I am setting up a spark standalone server with an external hive metastore
> (using mysql), there is an issue that after 5 minutes inactivity, if I try
> to reconnect to the metastore (i.e. by executing a new query), it hangs for
> about 10 mins then times out. My guess is that datanucleus does not close
> the existing connections from the pool, but still tries to create new ones
> for some reason.
>
> Tried different type of connection pools, didn't help either.
>
> thanks,
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/hive-external-metastore-connection-timeout-tp23052.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Akhil Das
Are you using the createStream or createDirectStream api? If its the
former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
slow things down though). Another way would be to try the later one.

Thanks
Best Regards

On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:

> Hi Akhil,
>
> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
> Processing Time is around 400ms, and there's no Scheduling Delay, so I
> suppose it's not the Kafka messages that eat up the off-heap memory. Or
> maybe it is, but how to tell?
>
> I googled about how to check the off-heap memory usage, there's a tool
> called pmap, but I don't know how to interprete the results.
>
> On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
> wrote:
>
>> After submitting the job, if you do a ps aux | grep spark-submit then you
>> can see all JVM params. Are you using the highlevel consumer (receiver
>> based) for receiving data from Kafka? In that case if your throughput is
>> high and the processing delay exceeds batch interval then you will hit this
>> memory issues as the data will keep on receiving and is dumped to memory.
>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
>> Another alternate will be to use the lowlevel kafka consumer
>>  or to use the
>> non-receiver based directStream
>> 
>> that comes up with spark.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG  wrote:
>>
>>> Hi,
>>>
>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
>>> out that YARN is killing the driver and executor process because of
>>> excessive use of memory. Here's something I tried:
>>>
>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
>>> extra memory is not used by heap.
>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but the
>>> memory just keeps growing and then hits the limit.
>>> 3. This problem is not shown in low-throughput jobs, neither in
>>> standalone mode.
>>> 4. The test job just receives messages from Kafka, with batch interval
>>> of 1, do some filtering and aggregation, and then print to executor logs.
>>> So it's not some 3rd party library that causes the 'leak'.
>>>
>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>
>>> Any ideas will be appreciated.
>>>
>>> Thanks.
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>


Re: View all user's application logs in history server

2015-05-27 Thread Marcelo Vanzin
Then:
- Are all files readable by the user running the history server?
- Did all applications call sc.stop() correctly (i.e. files do not have the
".inprogress" suffix)?

Other than that, always look at the logs first, looking for any errors that
may be thrown.


On Wed, May 27, 2015 at 9:10 AM, Jianshi Huang 
wrote:

> Yes, all written to the same directory on HDFS.
>
> Jianshi
>
> On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin 
> wrote:
>
>> You may be the only one not seeing all the logs. Are you sure all the
>> users are writing to the same log directory? The HS can only read from a
>> single log directory.
>>
>> On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang 
>> wrote:
>>
>>> No one using History server? :)
>>>
>>> Am I the only one need to see all user's logs?
>>>
>>> Jianshi
>>>
>>> On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang 
>>> wrote:
>>>
 Hi,

 I'm using Spark 1.4.0-rc1 and I'm using default settings for history
 server.

 But I can only see my own logs. Is it possible to view all user's logs?
 The permission is fine for the user group.

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github & Blog: http://huangjs.github.com/

>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Marcelo
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Marcelo


Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
Yes, all written to the same directory on HDFS.

Jianshi

On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin 
wrote:

> You may be the only one not seeing all the logs. Are you sure all the
> users are writing to the same log directory? The HS can only read from a
> single log directory.
>
> On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang 
> wrote:
>
>> No one using History server? :)
>>
>> Am I the only one need to see all user's logs?
>>
>> Jianshi
>>
>> On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm using Spark 1.4.0-rc1 and I'm using default settings for history
>>> server.
>>>
>>> But I can only see my own logs. Is it possible to view all user's logs?
>>> The permission is fine for the user group.
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Marcelo
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Adding columns to DataFrame

2015-05-27 Thread Masf
Hi.

I think that it's possible to do:

*df.select($"*", lit(null).as("col17", lit(null).as("col18",
lit(null).as("col19",, lit(null).as("col26")*

Any other advice?

Miguel.

On Wed, May 27, 2015 at 5:02 PM, Masf  wrote:

> Hi.
>
> I have a DataFrame with 16 columns (df1) and another with 26 columns(df2).
> I want to do a UnionAll.  So, I want to add 10 columns to df1 in order to
> have the same number of columns in both dataframes.
>
> Is there some alternative to "withColumn"?
>
> Thanks
>
> --
> Regards.
> Miguel Ángel
>



-- 


Saludos.
Miguel Ángel


Re: View all user's application logs in history server

2015-05-27 Thread Marcelo Vanzin
You may be the only one not seeing all the logs. Are you sure all the users
are writing to the same log directory? The HS can only read from a single
log directory.

On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang 
wrote:

> No one using History server? :)
>
> Am I the only one need to see all user's logs?
>
> Jianshi
>
> On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang 
> wrote:
>
>> Hi,
>>
>> I'm using Spark 1.4.0-rc1 and I'm using default settings for history
>> server.
>>
>> But I can only see my own logs. Is it possible to view all user's logs?
>> The permission is fine for the user group.
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Marcelo


Multilabel classification using logistic regression

2015-05-27 Thread peterg
Hi all

I believe I have created a multi-label classifier using LogisticRegression
but there is one snag. No matter what features I use to get the prediction,
it will always return the label. I feel like I need to set a threshold but
can't seem to figure out how to do that. I attached the code below. It's
super simple. Hopefully someone can point me in the correct :

val labels = labeledPoints.map(l => l.label).take(1000).distinct // stupid
hack
val groupedRDDs = labels.map { l => labeledPoints.filter (m => m.label == l)
}.map(l => l.cache()) // should use groupBy
val models = groupedRDDs.map(rdd => new
LogisticRegressionWithLBFGS().setNumClasses(101).run(rdd))
val results = models.map(m => m.predict(Vectors.dense(query.features)))

Thanks

Peter



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multilabel-classification-using-logistic-regression-tp23054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question about Serialization in Storage Level

2015-05-27 Thread Imran Rashid
Hi Zhipeng,

yes, your understanding is correct.  the "SER" portion just refers to how
its stored in memory.  On disk, the data always has to be serialized.


On Fri, May 22, 2015 at 10:40 PM, Jiang, Zhipeng 
wrote:

>  Hi Todd, Howard,
>
>
>
> Thanks for your reply, I might not present my question clearly.
>
>
>
> What I mean is, if I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*,
> the BlockManager will cache the rdd to MemoryStore. RDD will be migrated to
> DiskStore when it cannot fit in memory. I think this migration does require
> data serialization and compression (if spark.rdd.compress is set to be
> true). So the data in Disk is serialized, even if I didn’t choose a
> serialized storage level, am I right?
>
>
>
> Thanks,
>
> Zhipeng
>
>
>
>
>
> *From:* Todd Nist [mailto:tsind...@gmail.com]
> *Sent:* Thursday, May 21, 2015 8:49 PM
> *To:* Jiang, Zhipeng
> *Cc:* user@spark.apache.org
> *Subject:* Re: Question about Serialization in Storage Level
>
>
>
> From the docs,
> https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
> :
>
>
>
> *Storage Level*
>
> *Meaning*
>
> MEMORY_ONLY
>
> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit
> in memory, some partitions will not be cached and will be recomputed on the
> fly each time they're needed. This is the default level.
>
> MEMORY_AND_DISK
>
> Store RDD as *deserialized* Java objects in the JVM. If the RDD does not
> fit in memory, store the partitions that don't fit on disk, and read them
> from there when they're needed.
>
> MEMORY_ONLY_SER
>
> Store RDD as *serialized* Java objects (one byte array per partition).
> This is generally more space-efficient than deserialized objects,
> especially when using a fast serializer
> , but more
> CPU-intensive to read.
>
> MEMORY_AND_DISK_SER
>
> Similar to *MEMORY_ONLY_SER*, but spill partitions that don't fit in
> memory to disk instead of recomputing them on the fly each time they're
> needed.
>
>
>
> On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng 
> wrote:
>
>  Hi there,
>
>
>
> This question may seem to be kind of naïve, but what’s the difference
> between *MEMORY_AND_DISK* and *MEMORY_AND_DISK_SER*?
>
>
>
> If I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager
> won’t serialize the *rdd*?
>
>
>
> Thanks,
>
> Zhipeng
>
>
>


How to get the best performance with LogisticRegressionWithSGD?

2015-05-27 Thread SparknewUser
I'm new to Spark and I'm getting bad performance with classification methods
on Spark MLlib (worse than R in terms of AUC).
I am trying to put my own parameters rather than the default parameters.
Here is the method I want to use : 
train(RDD input,
int numIterations,
  double stepSize,
 double miniBatchFraction,
Vector initialWeights)
How to choose "numIterations" and "stepSize"? 
What does miniBatchFraction mean?
Is initialWeights necessary to have a good model? Then, how to choose them?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-the-best-performance-with-LogisticRegressionWithSGD-tp23053.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to get the best performance with LogisticRegressionWithSGD?

2015-05-27 Thread mélanie gallois
I'm new to Spark and I'm getting bad performance with classification
methods on Spark MLlib (worse than R in terms of AUC).
I am trying to put my own parameters rather than the default parameters.
Here is the method I want to use :

train(RDD 
https://spark.apache.org/docs/1.0.2/api/java/org/apache/spark/mllib/regression/LabeledPoint.html>>
input,
int numIterations,
  double stepSize,
 double miniBatchFraction,
Vector 

initialWeights)

How to choose "numIterations" and "stepSize"?
What does miniBatchFraction mean?
Is initialWeights necessary to have a good model? Then, how to choose them?


Regards,

Mélanie Gallois


hive external metastore connection timeout

2015-05-27 Thread jamborta
Hi all,

I am setting up a spark standalone server with an external hive metastore
(using mysql), there is an issue that after 5 minutes inactivity, if I try
to reconnect to the metastore (i.e. by executing a new query), it hangs for
about 10 mins then times out. My guess is that datanucleus does not close
the existing connections from the pool, but still tries to create new ones
for some reason. 

Tried different type of connection pools, didn't help either.

thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hive-external-metastore-connection-timeout-tp23052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and logging

2015-05-27 Thread Imran Rashid
only an answer to one of your questions:


What about log statements in the
> partition processing functions?  Will their log statements get logged into
> a
> file residing on a given 'slave' machine, or will Spark capture this log
> output and divert it into the log file of the driver's machine?
>

they get logged to files on the remote nodes.  You can view the logs for
each executor through the UI.  If you are using spark on yarn, you can grab
all the logs with "yarn logs".


Adding columns to DataFrame

2015-05-27 Thread Masf
Hi.

I have a DataFrame with 16 columns (df1) and another with 26 columns(df2).
I want to do a UnionAll.  So, I want to add 10 columns to df1 in order to
have the same number of columns in both dataframes.

Is there some alternative to "withColumn"?

Thanks

-- 
Regards.
Miguel Ángel


Decision Trees / Random Forest Multiple Labels

2015-05-27 Thread cfusting
Hi,

I'd like to use a DT or RF to describe data with multiple dimensions
(lat/long). I'm new to Spark but this doesn't seem possible with a
LabeledPoint. Am I mistaken? Will it be in the future?

Cheers,

_Chris 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-Trees-Random-Forest-Multiple-Labels-tp23051.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Where does partitioning and data loading happen?

2015-05-27 Thread ayan guha
I hate to say this, but your friend is right. Spark slaves (executors)
really pull the data. In fact, it is a standard practice in distributed
world, eg Hadoop. It is not practical to pass large amount of data through
master nor it gives a way to parallely read the data.

You can either use spark's own way of splitting the data, or you can use
hadoop style input formats which gives a set of splits, or you can use self
describing formats like parquet. But essentially your api to data has to
have a concept of cutting data to chunks, and spark just uses that to
decide which "slave" to pull what.

(mesos has nothing to do with this. its just a cluster manager as far as
spark is concerned)

On Thu, May 28, 2015 at 12:22 AM, Stephen Carman 
wrote:

> A colleague and I were having a discussion and we were disagreeing about
> something in Spark/Mesos that perhaps someone can shed some light into.
>
> We have a mesos cluster that runs spark via a sparkHome, rather than
> downloading an executable and such.
>
> My colleague says that say we have parquet files in S3, that slaves should
> know what data is in their partition and only pull from the S3 the
> partitions of parquet data they need, but this seems inherinitly wrong to
> me.
> as I have no idea how it’s possible for Spark or Mesos to know what
> partitions to know what to pull on the slave. It makes much more sense to
> me for the partitioning to be done on the driver and then distributed to the
> slaves so the slaves don’t have to necessarily worry about these details.
> If this were the case there is some data loading that is done on the
> driver, correct? Or does spark/mesos do some magic to pass a reference so
> the slaves
> know what to pull per say?
>
> So I guess in summation, where does partitioning and data loading happen?
> On the driver or on the executor?
>
> Thanks,
> Steve
> This e-mail is intended solely for the above-mentioned recipient and it
> may contain confidential or privileged information. If you have received it
> in error, please notify us immediately and delete the e-mail. You must not
> copy, distribute, disclose or take any action in reliance on it. In
> addition, the contents of an attachment to this e-mail may contain software
> viruses which could damage your own computer system. While ColdLight
> Solutions, LLC has taken every reasonable precaution to minimize this risk,
> we cannot accept liability for any damage which you sustain as a result of
> software viruses. You should perform your own virus checks before opening
> the attachment.
>



-- 
Best Regards,
Ayan Guha


Spark and logging

2015-05-27 Thread dgoldenberg
I'm wondering how logging works in Spark.

I see that there's the log4j.properties.template file in the conf directory. 
Safe to assume Spark is using log4j 1?  What's the approach if we're using
log4j 2?  I've got a log4j2.xml file in the job jar which seems to be
working for my log statements but Spark's logging seems to be taking its own
default route despite me setting Spark's log to 'warn' only.

More interestingly, what happens if file-based loggers are at play?

If a log statement is in the driver program I assume it'll get logged into a
log file that's collocated with the driver. What about log statements in the
partition processing functions?  Will their log statements get logged into a
file residing on a given 'slave' machine, or will Spark capture this log
output and divert it into the log file of the driver's machine?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-logging-tp23049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Where does partitioning and data loading happen?

2015-05-27 Thread Stephen Carman
A colleague and I were having a discussion and we were disagreeing about 
something in Spark/Mesos that perhaps someone can shed some light into.

We have a mesos cluster that runs spark via a sparkHome, rather than 
downloading an executable and such.

My colleague says that say we have parquet files in S3, that slaves should know 
what data is in their partition and only pull from the S3 the partitions of 
parquet data they need, but this seems inherinitly wrong to me.
as I have no idea how it’s possible for Spark or Mesos to know what partitions 
to know what to pull on the slave. It makes much more sense to me for the 
partitioning to be done on the driver and then distributed to the
slaves so the slaves don’t have to necessarily worry about these details. If 
this were the case there is some data loading that is done on the driver, 
correct? Or does spark/mesos do some magic to pass a reference so the slaves
know what to pull per say?

So I guess in summation, where does partitioning and data loading happen? On 
the driver or on the executor?

Thanks,
Steve
This e-mail is intended solely for the above-mentioned recipient and it may 
contain confidential or privileged information. If you have received it in 
error, please notify us immediately and delete the e-mail. You must not copy, 
distribute, disclose or take any action in reliance on it. In addition, the 
contents of an attachment to this e-mail may contain software viruses which 
could damage your own computer system. While ColdLight Solutions, LLC has taken 
every reasonable precaution to minimize this risk, we cannot accept liability 
for any damage which you sustain as a result of software viruses. You should 
perform your own virus checks before opening the attachment.


RE: How does spark manage the memory of executor with multiple tasks

2015-05-27 Thread java8964
Same as you, there are lots of people coming from MapReduce world, and try to 
understand the internals of Spark. Hope below can help you some way.
For the end users, they only have concept of Job. I want to run a word count 
job from this one big file, that is the job I want to run. How many stages and 
tasks this job will generate depends on the file size and parallelism you 
specify in your job.
For word count, it will generate 2 stages, as we have shuffle in it, thinking 
it the same way as Mapper and Reducer part.
If the file is 1280M size in HDFS with 128M block, so the first stage will 
generate 10 tasks. If you use the default parallelism in spark, the 2nd stage 
should generate 200 tasks.
Forget about Executors right now, so the above job will have 210 tasks to run. 
In the standalone mode, you need to specify the cores and memory for your job. 
Let's assume you have 5 worker nodes with 4 cores + 8G each. Now, if you ask 10 
cores and 2G per executor, and cluster does have the enough resources 
available, then you will get 1 executor from each work node, with 2 cores + 2G 
per executor to run your job.In this case, first 10 tasks in the stage one can 
start concurrently at the same time, after that, every 10 tasks in stage 2 can 
be run concurrently. You get 5 executors, as you have 5 worker nodes. There is 
a coming feature to start multi executors per worker, but we are talking about 
the normally case here. In fact, you can start multi workers in one physical 
box, if you have enough resource.
In the above case, 2 tasks will be run concurrently per executor. You control 
this by specify how many cores you want for your job, plus how many workers in 
your cluster as pre configured. These 2 tasks have to share the 2G heap memory. 
I don't think specifying the memory per task is a good idea, as task is running 
in the Thread level, and Memory only apply for the JVM processor. 
In MR, every mapper and reducer match to a java processing, but in spark, the 
task is just matching with a thread/core.
In Spark, memory tuning is more like an art, but still have lot of rules to 
follow. In the above case, you can increase the parallelism to 400, then you 
will have 400 tasks in the stage 2, so each task will come with less data, 
provided you have much large unique words in the file. Or you can lower the 
cores from 10 to 5, then each executor will only process one task at a time, 
but your job will run slower.
Overall, you want to max the parallelism to gain the best speed, but also make 
sure the memory is enough for your job at this speed, to avoid OOM. It is a 
balance.
Keep in mind:Cluster pre-config with number of workers with total cores + max 
heap memory you can askPer application, you specify total cores you want + heap 
memory per executorIn your application, you can specify the parallelism level, 
as lots of "Action" supporting it. So parallelism is dynamic, from job to job, 
or even from stage to stage.
Yong
Date: Wed, 27 May 2015 15:48:57 +0800
Subject: Re: How does spark manage the memory of executor with multiple tasks
From: ccn...@gmail.com
To: evo.efti...@isecc.com
CC: ar...@sigmoidanalytics.com; user@spark.apache.org

Does anyone can answer my question ? I am curious to know if there's multiple 
reducer tasks in one executor, how to allocate memory between these reducers 
tasks since each shuffle will consume a lot of memory ?
On Tue, May 26, 2015 at 7:27 PM, Evo Eftimov  wrote:
 the link you sent says multiple executors per node
Worker is just demon process launching Executors / JVMs so it can execute tasks 
- it does that by cooperating with the master and the driver 
There is a one to one maping between Executor and JVM 

Sent from Samsung Mobile

 Original message From: Arush Kharbanda  Date:2015/05/26  10:55 
 (GMT+00:00) To: canan chen  Cc: Evo Eftimov ,user@spark.apache.org Subject: 
Re: How does spark manage the memory of executor with multiple tasks 
Hi Evo,
Worker is the JVM and an executor runs on the JVM. And after Spark 1.4 you 
would be able to run multiple executors on the same JVM/worker.
https://issues.apache.org/jira/browse/SPARK-1706.

ThanksArush
On Tue, May 26, 2015 at 2:54 PM, canan chen  wrote:
I think the concept of task in spark should be on the same level of task in MR. 
Usually in MR, we need to specify the memory the each mapper/reducer task. And 
I believe executor is not a user-facing concept, it's a spark internal concept. 
For spark users they don't need to know the concept of executor, but need to 
know the concept of task. 
On Tue, May 26, 2015 at 5:09 PM, Evo Eftimov  wrote:
This is the first time I hear that “one can specify the RAM per task” – the RAM 
is granted per Executor (JVM). On the other hand each Task operates on ONE RDD 
Partition – so you can say that this is “the RAM allocated to the Task to 
process” – but it is still within the boundaries allocated to the Executor 
(JVM) within which the Task is running. Also while runni

Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
No one using History server? :)

Am I the only one need to see all user's logs?

Jianshi

On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang 
wrote:

> Hi,
>
> I'm using Spark 1.4.0-rc1 and I'm using default settings for history
> server.
>
> But I can only see my own logs. Is it possible to view all user's logs?
> The permission is fine for the user group.
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


[POWERED BY] Please add our organization

2015-05-27 Thread Antonio Giambanco
Name: Objectway.com
URL: www.objectway.com

Description:

We're building a Big Data solution on Spark. We use Apache Flume for
parallel message queuing infrastructure and Apache Spark Streaming for near
real time datastream processing combined with a rule engine for complex
events catching.


回复:Re: Re: Re: Re: how to distributed run a bash shell in spark

2015-05-27 Thread luohui20001
Hi Akhil and all
My previous code has some problems,all the executors are looping and 
running the same command. That's not what I am expecting.previous code:
val shellcompare = List("run","sort.sh")
val shellcompareRDD = sc.makeRDD(shellcompare)
val result = List("aggregate","result")
val resultRDD = sc.makeRDD(result)
for(j <- 1 to 21){
 resultRDD.pipe("sh /opt/sh/bin/sort.sh 
/opt/data/shellcompare/chr" + j + ".txt 
/opt/data/shellcompare/samplechr" + j + ".txt 
/opt/data/shellcompare/result" + j + ".txt 600").collect()
What i want is all  the executor running different commands concurrently. So I 
modified the code like below:val resultRDD = sc.parallelize(Array(1, 2, 3, 
4, 5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21))
resultRDD.map{i=>resultRDD.pipe("sh /opt/sh/bin/sort.sh 
/opt/data/shellcompare/chr" + i + ".txt /opt/data/shellcompare/samplechr" + i + 
".txt /opt/data/shellcompare/result" + i + ".txt 600")
}.collect()
however I got an exception below:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 
57, slave1): org.apache.spark.SparkException: RDD transformations and actions 
can only be invoked by the driver, not inside of other transformations; for 
example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
any advices on this? Thanks.



 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das 
收件人:罗辉 
抄送人:user 
主题:Re: Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月26日 14点46分

If you open up the driver UI (running on 4040), you can see multiple tasks per 
stage which will be happening concurrently. If it is a single task, and you 
want to increase the parallelism, then you can simply do a 
re-partition.ThanksBest Regards

On Tue, May 26, 2015 at 8:27 AM,   wrote:
I am right trying to run some shell script in my spark app, hoping it runs more 
concurrently in my spark cluster.However I am not sure whether my codes will 
run concurrently in my executors.Dive into my code, you can see that I am 
trying to 
1.splite both db and sample into 21 small files. That will generate total 42 
files. By spliting db I will get "chr1" ,"chr2",..."chr21", and spliting sample 
I will get "samplechr1","samplechr2",..."samplechr21".2.merge those splited 
files from hdfs and save to local path.Those merged file will be "chr1.txt" 
,"chr2.txt",..."chr21.txt" and 
"samplechr1.txt","samplechr2.txt",..."samplechr21.txt"3.run modify.sh to clean 
the data, that means to delete some charactors not useful.
4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a 
result1.txt. And looping it from 1 to 21 so that those 42 file are compared and 
I can get 21 files like result1.txt,result2.txt...result21.txt.
Sorry for not adding some comments for my code.




 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das 
收件人:罗辉 
抄送人:user 
主题:Re: Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 22点41分

Can you can tell us what exactly you are trying to achieve?ThanksBest Regards

On Mon, May 25, 2015 at 5:00 PM,   wrote:
thanks, madhu and Akhil
I modified my code like below,however I think it is not so distributed. Have 
you guys better idea to run this app more efficiantly and distributed?
So I add some comments with my understanding:
import org.apache.spark._
import www.celloud.com.model._

object GeneCompare3 {
  def main(args: Array[String]) {
val conf = new 
SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory",
 "6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x => 
Database(x(0),
  x(1).trim().toInt,
  x(2).trim().toInt,
  x(3).trim().toInt,
  x(4).trim().toInt)).toDF()
val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x => 
Sample(x(0),
  x(1).trim().toInt,
  x(2),
  x(3).trim().toInt,
  x(4).trim().toInt,
  x(5))).toDF()
  
  //using Akhil Das's idea,1to21 is a file with 21lines,each line is a 
single number
//val test = sc.textfile("1to21.txt").foreach{i =>  
 //running on driver manager
//  db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + 
i) //running on driver executor
//  db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " 
/opt/data/shellcompare/chr" + i + ".txt")
//  sample.filter("name = 'chr" + i + 
"'").rdd.saveAsTextFile("/data/samplechr" + i)
//  db.rdd.pipe("hado

Re: DataFrame. Conditional aggregation

2015-05-27 Thread ayan guha
Till 1.3, you have to prepare the DF appropriately

def setupCondition(t):
if t[1] > 100:
v = 1
else:
v = 0
return Row(col1=t[0],col2=t[1],col3=t[2],col4=v)


 d1=[[1001,100,50],[1001,200,100],[1002,100,99]]
d1RDD = sc.parallelize(d1).map(setupCondition)
d1DF = ssc.createDataFrame(d1RDD)
d1DF.printSchema()
d1DF.show()
res = d1DF.groupBy("col1").agg({'col3':'min','col4':'sum'})
print "\n\n"
res.show()

root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)
 |-- col4: long (nullable = true)

col1 col2 col3 col4
1001 100  50   0
1001 200  100  1
1002 100  99   0



col1 SUM(col4) MIN(col3)
1001 1 50
1002 0 99

Good news is since 1.4, DF will have methods like when,otherwise (and a LOT
more)cant wait to get my hands on 1.4 :)




On Wed, May 27, 2015 at 5:12 PM, Masf  wrote:

> Yes. I think that your sql solution will work but I was looking for a
> solution with DataFrame API. I had thought to use UDF such as:
>
> val myFunc = udf {(input: Int) => {if (input > 100) 1 else 0}}
>
> Although I'd like to know if it's possible to do it directly in the
> aggregation inserting a lambda function or something else.
>
> Thanks
>
> Regards.
> Miguel.
>
>
> On Wed, May 27, 2015 at 1:06 AM, ayan guha  wrote:
>
>> For this, I can give you a SQL solution:
>>
>> joined data.registerTempTable('j')
>>
>> Res=ssc.sql('select col1,col2, count(1) counter, min(col3) minimum,
>> sum(case when endrscp>100 then 1 else 0 end test from j'
>>
>> Let me know if this works.
>> On 26 May 2015 23:47, "Masf"  wrote:
>>
>>> Hi
>>> I don't know how it works. For example:
>>>
>>> val result = joinedData.groupBy("col1","col2").agg(
>>>   count(lit(1)).as("counter"),
>>>   min(col3).as("minimum"),
>>>   sum("case when endrscp> 100 then 1 else 0 end").as("test")
>>> )
>>>
>>> How can I do it?
>>>
>>> Thanks
>>> Regards.
>>> Miguel.
>>>
>>> On Tue, May 26, 2015 at 12:35 AM, ayan guha  wrote:
>>>
 Case when col2>100 then 1 else col2 end
 On 26 May 2015 00:25, "Masf"  wrote:

> Hi.
>
> In a dataframe, How can I execution a conditional sentence in a
> aggregation. For example, Can I translate this SQL statement to 
> DataFrame?:
>
> SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1)
> FROM table
> GROUP BY name
>
> Thanks
> --
> Regards.
> Miguel
>

>>>
>>>
>>> --
>>>
>>>
>>> Saludos.
>>> Miguel Ángel
>>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>



-- 
Best Regards,
Ayan Guha


Re: How many executors can I acquire in standalone mode ?

2015-05-27 Thread ayan guha
You can request number of cores and amount of memory for each executor.
On 27 May 2015 18:25, "canan chen"  wrote:

> Thanks Arush.
> My scenario is that In standalone mode, if I have one worker, when I start
> spark-shell, there will be one executor launched. But if I have 2 workers,
> there will be 2 executors launched, so I am wondering the mechanism of
> executor allocation.
> Is it possible to specify how many executors I want in the code ?
>
> On Tue, May 26, 2015 at 5:57 PM, Arush Kharbanda <
> ar...@sigmoidanalytics.com> wrote:
>
>> I believe you would be restricted by the number of cores you have in your
>> cluster. Having a worker running without a core is useless.
>>
>> On Tue, May 26, 2015 at 3:04 PM, canan chen  wrote:
>>
>>> In spark standalone mode, there will be one executor per worker. I am
>>> wondering how many executor can I acquire when I submit app ? Is it greedy
>>> mode (as many as I can acquire )?
>>>
>>
>>
>>
>> --
>>
>> [image: Sigmoid Analytics] 
>>
>> *Arush Kharbanda* || Technical Teamlead
>>
>> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>>
>
>


Re: How to give multiple directories as input ?

2015-05-27 Thread Eugen Cepoi
Try something like that:


 def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
Date, endDate: Date) = {

   // assuming a list of paths

   val paths: Seq[String] = getInputPaths(inputDir, startDate, endDate)

   val job = Job.getInstance(new Configuration(sc.hadoopConfiguration))

   paths.drop(1).foreach(p => FileInputFormat.addInputPath(job, new
Path(p)))

   sc.newAPIHadoopFile(paths.head,
classOf[AvroKeyInputFormat[GenericRecord]], classOf[NullWritable],
classOf[GenericRecord], job.getConfiguration())

  }

2015-05-27 10:55 GMT+02:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

>
>  def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
> Date, endDate: Date) = {
>
> val path = getInputPaths(inputDir, startDate, endDate)
>
>sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]]("/A/B/C/D/D/2015/05/22/out-r-*.avro")
>
>   }
>
>
> This is my method, can you show me where should i modify to use
> FileInputFormat ? If you add the path there what should you give while
> invoking newAPIHadoopFile
>
> On Wed, May 27, 2015 at 2:20 PM, Eugen Cepoi 
> wrote:
>
>> You can do that using FileInputFormat.addInputPath
>>
>> 2015-05-27 10:41 GMT+02:00 ayan guha :
>>
>>> What about /blah/*/blah/out*.avro?
>>> On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>>>
 I am doing that now.
 Is there no other way ?

 On Wed, May 27, 2015 at 12:40 PM, Akhil Das >>> > wrote:

> How about creating two and union [ sc.union(first, second) ] them?
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> I have this piece
>>
>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>> AvroKeyInputFormat[GenericRecord]](
>> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>>
>> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
>> input.
>>
>> I want to give a second directory as input but this is a invalid
>> syntax
>>
>>
>> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
>> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>
>> OR
>>
>> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
>> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>
>>
>> Please suggest.
>>
>>
>>
>> --
>> Deepak
>>
>>
>


 --
 Deepak


>>
>
>
> --
> Deepak
>
>


Avro CombineInputFormat ?

2015-05-27 Thread ๏̯͡๏
Can someone share me some code to use CombineInputFormat to read avro data.

Today I use

def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date,
endDate: Date) = {

val path = getInputPaths(inputDir, startDate, endDate)

   sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]]("/A/B/C/D/D/2015/05/22/out-r-*.avro")

  }

And i want to use CombineInputFormat. Will it be any helpful ?
-- 
Deepak


Re: How to give multiple directories as input ?

2015-05-27 Thread ๏̯͡๏
 def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
Date, endDate: Date) = {

val path = getInputPaths(inputDir, startDate, endDate)

   sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]]("/A/B/C/D/D/2015/05/22/out-r-*.avro")

  }


This is my method, can you show me where should i modify to use
FileInputFormat ? If you add the path there what should you give while
invoking newAPIHadoopFile

On Wed, May 27, 2015 at 2:20 PM, Eugen Cepoi  wrote:

> You can do that using FileInputFormat.addInputPath
>
> 2015-05-27 10:41 GMT+02:00 ayan guha :
>
>> What about /blah/*/blah/out*.avro?
>> On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>>
>>> I am doing that now.
>>> Is there no other way ?
>>>
>>> On Wed, May 27, 2015 at 12:40 PM, Akhil Das 
>>> wrote:
>>>
 How about creating two and union [ sc.union(first, second) ] them?

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
 wrote:

> I have this piece
>
> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]](
> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>
> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
> input.
>
> I want to give a second directory as input but this is a invalid syntax
>
>
> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>
> OR
>
> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>
>
> Please suggest.
>
>
>
> --
> Deepak
>
>

>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>


-- 
Deepak


Re: How to give multiple directories as input ?

2015-05-27 Thread Eugen Cepoi
You can do that using FileInputFormat.addInputPath

2015-05-27 10:41 GMT+02:00 ayan guha :

> What about /blah/*/blah/out*.avro?
> On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:
>
>> I am doing that now.
>> Is there no other way ?
>>
>> On Wed, May 27, 2015 at 12:40 PM, Akhil Das 
>> wrote:
>>
>>> How about creating two and union [ sc.union(first, second) ] them?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>>> wrote:
>>>
 I have this piece

 sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
 AvroKeyInputFormat[GenericRecord]](
 "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")

 that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
 input.

 I want to give a second directory as input but this is a invalid syntax


 that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
 "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")

 OR

 ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
 /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")


 Please suggest.



 --
 Deepak


>>>
>>
>>
>> --
>> Deepak
>>
>>


Re: How to give multiple directories as input ?

2015-05-27 Thread ayan guha
What about /blah/*/blah/out*.avro?
On 27 May 2015 18:08, "ÐΞ€ρ@Ҝ (๏̯͡๏)"  wrote:

> I am doing that now.
> Is there no other way ?
>
> On Wed, May 27, 2015 at 12:40 PM, Akhil Das 
> wrote:
>
>> How about creating two and union [ sc.union(first, second) ] them?
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
>> wrote:
>>
>>> I have this piece
>>>
>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>> AvroKeyInputFormat[GenericRecord]](
>>> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>>>
>>> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
>>> input.
>>>
>>> I want to give a second directory as input but this is a invalid syntax
>>>
>>>
>>> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
>>> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>>
>>> OR
>>>
>>> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
>>> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>>
>>>
>>> Please suggest.
>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


Re: How many executors can I acquire in standalone mode ?

2015-05-27 Thread canan chen
Thanks Arush.
My scenario is that In standalone mode, if I have one worker, when I start
spark-shell, there will be one executor launched. But if I have 2 workers,
there will be 2 executors launched, so I am wondering the mechanism of
executor allocation.
Is it possible to specify how many executors I want in the code ?

On Tue, May 26, 2015 at 5:57 PM, Arush Kharbanda  wrote:

> I believe you would be restricted by the number of cores you have in your
> cluster. Having a worker running without a core is useless.
>
> On Tue, May 26, 2015 at 3:04 PM, canan chen  wrote:
>
>> In spark standalone mode, there will be one executor per worker. I am
>> wondering how many executor can I acquire when I submit app ? Is it greedy
>> mode (as many as I can acquire )?
>>
>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: How to give multiple directories as input ?

2015-05-27 Thread ๏̯͡๏
I am doing that now.
Is there no other way ?

On Wed, May 27, 2015 at 12:40 PM, Akhil Das 
wrote:

> How about creating two and union [ sc.union(first, second) ] them?
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> I have this piece
>>
>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>> AvroKeyInputFormat[GenericRecord]](
>> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>>
>> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as
>> input.
>>
>> I want to give a second directory as input but this is a invalid syntax
>>
>>
>> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
>> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>
>> OR
>>
>> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
>> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>>
>>
>> Please suggest.
>>
>>
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak


Inconsistent behavior with Dataframe Timestamp between 1.3.1 and 1.4.0

2015-05-27 Thread Justin Yip
Hello,

I am trying out 1.4.0 and notice there are some differences in behavior
with Timestamp between 1.3.1 and 1.4.0.

In 1.3.1, I can compare a Timestamp with string.
scala> val df = sqlContext.createDataFrame(Seq((1,
Timestamp.valueOf("2015-01-01 00:00:00")), (2,
Timestamp.valueOf("2014-01-01 00:00:00"
...
scala> df.filter($"_2" <= "2014-06-01").show
...
_1 _2
2  2014-01-01 00:00:...

However, in 1.4.0, the filter is always false:
scala> val df = sqlContext.createDataFrame(Seq((1,
Timestamp.valueOf("2015-01-01 00:00:00")), (2,
Timestamp.valueOf("2014-01-01 00:00:00"
df: org.apache.spark.sql.DataFrame = [_1: int, _2: timestamp]

scala> df.filter($"_2" <= "2014-06-01").show
+--+--+
|_1|_2|
+--+--+
+--+--+

Not sure if that is intended, but I cannot find any doc mentioning these
inconsistencies.

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-behavior-with-Dataframe-Timestamp-between-1-3-1-and-1-4-0-tp23045.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How does spark manage the memory of executor with multiple tasks

2015-05-27 Thread canan chen
Does anyone can answer my question ? I am curious to know if there's
multiple reducer tasks in one executor, how to allocate memory between
these reducers tasks since each shuffle will consume a lot of memory ?

On Tue, May 26, 2015 at 7:27 PM, Evo Eftimov  wrote:

>  the link you sent says multiple executors per node
>
> Worker is just demon process launching Executors / JVMs so it can execute
> tasks - it does that by cooperating with the master and the driver
>
> There is a one to one maping between Executor and JVM
>
>
> Sent from Samsung Mobile
>
>
>  Original message 
> From: Arush Kharbanda
> Date:2015/05/26 10:55 (GMT+00:00)
> To: canan chen
> Cc: Evo Eftimov ,user@spark.apache.org
> Subject: Re: How does spark manage the memory of executor with multiple
> tasks
>
> Hi Evo,
>
> Worker is the JVM and an executor runs on the JVM. And after Spark 1.4 you
> would be able to run multiple executors on the same JVM/worker.
>
> https://issues.apache.org/jira/browse/SPARK-1706.
>
> Thanks
> Arush
>
> On Tue, May 26, 2015 at 2:54 PM, canan chen  wrote:
>
>> I think the concept of task in spark should be on the same level of task
>> in MR. Usually in MR, we need to specify the memory the each mapper/reducer
>> task. And I believe executor is not a user-facing concept, it's a spark
>> internal concept. For spark users they don't need to know the concept of
>> executor, but need to know the concept of task.
>>
>> On Tue, May 26, 2015 at 5:09 PM, Evo Eftimov 
>> wrote:
>>
>>> This is the first time I hear that “one can specify the RAM per task” –
>>> the RAM is granted per Executor (JVM). On the other hand each Task operates
>>> on ONE RDD Partition – so you can say that this is “the RAM allocated to
>>> the Task to process” – but it is still within the boundaries allocated to
>>> the Executor (JVM) within which the Task is running. Also while running,
>>> any Task like any JVM Thread can request as much additional RAM e.g. for
>>> new Object instances  as there is available in the Executor aka JVM Heap
>>>
>>>
>>>
>>> *From:* canan chen [mailto:ccn...@gmail.com]
>>> *Sent:* Tuesday, May 26, 2015 9:30 AM
>>> *To:* Evo Eftimov
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: How does spark manage the memory of executor with
>>> multiple tasks
>>>
>>>
>>>
>>> Yes, I know that one task represent a JVM thread. This is what I
>>> confused. Usually users want to specify the memory on task level, so how
>>> can I do it if task if thread level and multiple tasks runs in the same
>>> executor. And even I don't know how many threads there will be. Besides
>>> that, if one task cause OOM, it would cause other tasks in the same
>>> executor fail too. There's no isolation between tasks.
>>>
>>>
>>>
>>> On Tue, May 26, 2015 at 4:15 PM, Evo Eftimov 
>>> wrote:
>>>
>>> An Executor is a JVM instance spawned and running on a Cluster Node
>>> (Server machine). Task is essentially a JVM Thread – you can have as many
>>> Threads as you want per JVM. You will also hear about “Executor Slots” –
>>> these are essentially the CPU Cores available on the machine and granted
>>> for use to the Executor
>>>
>>>
>>>
>>> Ps: what creates ongoing confusion here is that the Spark folks have
>>> “invented” their own terms to describe the design of their what is
>>> essentially a Distributed OO Framework facilitating Parallel Programming
>>> and Data Management in a Distributed Environment, BUT have not provided
>>> clear dictionary/explanations linking these “inventions” with standard
>>> concepts familiar to every Java, Scala etc developer
>>>
>>>
>>>
>>> *From:* canan chen [mailto:ccn...@gmail.com]
>>> *Sent:* Tuesday, May 26, 2015 9:02 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* How does spark manage the memory of executor with multiple
>>> tasks
>>>
>>>
>>>
>>> Since spark can run multiple tasks in one executor, so I am curious to
>>> know how does spark manage memory across these tasks. Say if one executor
>>> takes 1GB memory, then if this executor can run 10 tasks simultaneously,
>>> then each task can consume 100MB on average. Do I understand it correctly ?
>>> It doesn't make sense to me that spark run multiple tasks in one executor.
>>>
>>>
>>>
>>
>>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


Re: Is the executor number fixed during the lifetime of one app ?

2015-05-27 Thread Saisai Shao
I'm not sure about Mesos, maybe someone has the Mesos experience can help
answer this.

Thanks
Jerry

2015-05-27 15:21 GMT+08:00 DB Tsai :

> Typo. We can not figure a way to increase the number of executor in one
> node in mesos.
>
>
> On Wednesday, May 27, 2015, DB Tsai  wrote:
>
>> If with mesos, how do we control the number of executors? In our cluster,
>> each node only has one executor with very big JVM. Sometimes, if the
>> executor dies, all the concurrent running tasks will be gone. We would like
>> to have multiple executors in one node but can not figure out a way to do
>> it in Yarn.
>>
>> On Wednesday, May 27, 2015, Saisai Shao  wrote:
>>
>>> The drive has a heuristic mechanism to decide the number of executors in
>>> the run-time according the pending tasks. You could enable with
>>> configuration, you could refer to spark document to find the details.
>>>
>>> 2015-05-27 15:00 GMT+08:00 canan chen :
>>>
 How does the dynamic allocation works ? I mean does it related
 with parallelism of my RDD and how does driver know how many executor it
 needs ?

 On Wed, May 27, 2015 at 2:49 PM, Saisai Shao 
 wrote:

> It depends on how you use Spark, if you use Spark with Yarn and enable
> dynamic allocation, the number of executor is not fixed, will change
> dynamically according to the load.
>
> Thanks
> Jerry
>
> 2015-05-27 14:44 GMT+08:00 canan chen :
>
>> It seems the executor number is fixed for the standalone mode, not
>> sure other modes.
>>
>
>

>>>
>>
>> --
>> Sent from my iPhone
>>
>
>
> --
> Sent from my iPhone
>


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Ji ZHANG
Hi Akhil,

Thanks for your reply. Accoding to the Streaming tab of Web UI, the
Processing Time is around 400ms, and there's no Scheduling Delay, so I
suppose it's not the Kafka messages that eat up the off-heap memory. Or
maybe it is, but how to tell?

I googled about how to check the off-heap memory usage, there's a tool
called pmap, but I don't know how to interprete the results.

On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
wrote:

> After submitting the job, if you do a ps aux | grep spark-submit then you
> can see all JVM params. Are you using the highlevel consumer (receiver
> based) for receiving data from Kafka? In that case if your throughput is
> high and the processing delay exceeds batch interval then you will hit this
> memory issues as the data will keep on receiving and is dumped to memory.
> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
> Another alternate will be to use the lowlevel kafka consumer
>  or to use the
> non-receiver based directStream
> 
> that comes up with spark.
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG  wrote:
>
>> Hi,
>>
>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
>> out that YARN is killing the driver and executor process because of
>> excessive use of memory. Here's something I tried:
>>
>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
>> extra memory is not used by heap.
>> 2. I set the two memoryOverhead params to 1024 (default is 384), but the
>> memory just keeps growing and then hits the limit.
>> 3. This problem is not shown in low-throughput jobs, neither in
>> standalone mode.
>> 4. The test job just receives messages from Kafka, with batch interval of
>> 1, do some filtering and aggregation, and then print to executor logs. So
>> it's not some 3rd party library that causes the 'leak'.
>>
>> Spark 1.3 is built by myself, with correct hadoop versions.
>>
>> Any ideas will be appreciated.
>>
>> Thanks.
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry


Re: Is the executor number fixed during the lifetime of one app ?

2015-05-27 Thread DB Tsai
Typo. We can not figure a way to increase the number of executor in one
node in mesos.

On Wednesday, May 27, 2015, DB Tsai  wrote:

> If with mesos, how do we control the number of executors? In our cluster,
> each node only has one executor with very big JVM. Sometimes, if the
> executor dies, all the concurrent running tasks will be gone. We would like
> to have multiple executors in one node but can not figure out a way to do
> it in Yarn.
>
> On Wednesday, May 27, 2015, Saisai Shao  > wrote:
>
>> The drive has a heuristic mechanism to decide the number of executors in
>> the run-time according the pending tasks. You could enable with
>> configuration, you could refer to spark document to find the details.
>>
>> 2015-05-27 15:00 GMT+08:00 canan chen :
>>
>>> How does the dynamic allocation works ? I mean does it related
>>> with parallelism of my RDD and how does driver know how many executor it
>>> needs ?
>>>
>>> On Wed, May 27, 2015 at 2:49 PM, Saisai Shao 
>>> wrote:
>>>
 It depends on how you use Spark, if you use Spark with Yarn and enable
 dynamic allocation, the number of executor is not fixed, will change
 dynamically according to the load.

 Thanks
 Jerry

 2015-05-27 14:44 GMT+08:00 canan chen :

> It seems the executor number is fixed for the standalone mode, not
> sure other modes.
>


>>>
>>
>
> --
> Sent from my iPhone
>


-- 
Sent from my iPhone


Re: Is the executor number fixed during the lifetime of one app ?

2015-05-27 Thread DB Tsai
If with mesos, how do we control the number of executors? In our cluster,
each node only has one executor with very big JVM. Sometimes, if the
executor dies, all the concurrent running tasks will be gone. We would like
to have multiple executors in one node but can not figure out a way to do
it in Yarn.

On Wednesday, May 27, 2015, Saisai Shao  wrote:

> The drive has a heuristic mechanism to decide the number of executors in
> the run-time according the pending tasks. You could enable with
> configuration, you could refer to spark document to find the details.
>
> 2015-05-27 15:00 GMT+08:00 canan chen  >:
>
>> How does the dynamic allocation works ? I mean does it related
>> with parallelism of my RDD and how does driver know how many executor it
>> needs ?
>>
>> On Wed, May 27, 2015 at 2:49 PM, Saisai Shao > > wrote:
>>
>>> It depends on how you use Spark, if you use Spark with Yarn and enable
>>> dynamic allocation, the number of executor is not fixed, will change
>>> dynamically according to the load.
>>>
>>> Thanks
>>> Jerry
>>>
>>> 2015-05-27 14:44 GMT+08:00 canan chen >> >:
>>>
 It seems the executor number is fixed for the standalone mode, not sure
 other modes.

>>>
>>>
>>
>

-- 
Sent from my iPhone


Re: Is the executor number fixed during the lifetime of one app ?

2015-05-27 Thread Saisai Shao
The drive has a heuristic mechanism to decide the number of executors in
the run-time according the pending tasks. You could enable with
configuration, you could refer to spark document to find the details.

2015-05-27 15:00 GMT+08:00 canan chen :

> How does the dynamic allocation works ? I mean does it related
> with parallelism of my RDD and how does driver know how many executor it
> needs ?
>
> On Wed, May 27, 2015 at 2:49 PM, Saisai Shao 
> wrote:
>
>> It depends on how you use Spark, if you use Spark with Yarn and enable
>> dynamic allocation, the number of executor is not fixed, will change
>> dynamically according to the load.
>>
>> Thanks
>> Jerry
>>
>> 2015-05-27 14:44 GMT+08:00 canan chen :
>>
>>> It seems the executor number is fixed for the standalone mode, not sure
>>> other modes.
>>>
>>
>>
>


Re: DataFrame. Conditional aggregation

2015-05-27 Thread Masf
Yes. I think that your sql solution will work but I was looking for a
solution with DataFrame API. I had thought to use UDF such as:

val myFunc = udf {(input: Int) => {if (input > 100) 1 else 0}}

Although I'd like to know if it's possible to do it directly in the
aggregation inserting a lambda function or something else.

Thanks

Regards.
Miguel.


On Wed, May 27, 2015 at 1:06 AM, ayan guha  wrote:

> For this, I can give you a SQL solution:
>
> joined data.registerTempTable('j')
>
> Res=ssc.sql('select col1,col2, count(1) counter, min(col3) minimum,
> sum(case when endrscp>100 then 1 else 0 end test from j'
>
> Let me know if this works.
> On 26 May 2015 23:47, "Masf"  wrote:
>
>> Hi
>> I don't know how it works. For example:
>>
>> val result = joinedData.groupBy("col1","col2").agg(
>>   count(lit(1)).as("counter"),
>>   min(col3).as("minimum"),
>>   sum("case when endrscp> 100 then 1 else 0 end").as("test")
>> )
>>
>> How can I do it?
>>
>> Thanks
>> Regards.
>> Miguel.
>>
>> On Tue, May 26, 2015 at 12:35 AM, ayan guha  wrote:
>>
>>> Case when col2>100 then 1 else col2 end
>>> On 26 May 2015 00:25, "Masf"  wrote:
>>>
 Hi.

 In a dataframe, How can I execution a conditional sentence in a
 aggregation. For example, Can I translate this SQL statement to DataFrame?:

 SELECT name, SUM(IF table.col2 > 100 THEN 1 ELSE table.col1)
 FROM table
 GROUP BY name

 Thanks
 --
 Regards.
 Miguel

>>>
>>
>>
>> --
>>
>>
>> Saludos.
>> Miguel Ángel
>>
>


-- 


Saludos.
Miguel Ángel


Re: How to give multiple directories as input ?

2015-05-27 Thread Akhil Das
How about creating two and union [ sc.union(first, second) ] them?

Thanks
Best Regards

On Wed, May 27, 2015 at 11:51 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I have this piece
>
> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]](
> "/a/b/c/d/exptsession/2015/05/22/out-r-*.avro")
>
> that takes ("/a/b/c/d/exptsession/2015/05/22/out-r-*.avro") this as input.
>
> I want to give a second directory as input but this is a invalid syntax
>
>
> that takes ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro",
> "/a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>
> OR
>
> ("/a/b/c/d/exptsession/2015/05/*22*/out-r-*.avro,
> /a/b/c/d/exptsession/2015/05/*21*/out-r-*.avro")
>
>
> Please suggest.
>
>
>
> --
> Deepak
>
>


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Akhil Das
After submitting the job, if you do a ps aux | grep spark-submit then you
can see all JVM params. Are you using the highlevel consumer (receiver
based) for receiving data from Kafka? In that case if your throughput is
high and the processing delay exceeds batch interval then you will hit this
memory issues as the data will keep on receiving and is dumped to memory.
You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
Another alternate will be to use the lowlevel kafka consumer
 or to use the
non-receiver based directStream

that comes up with spark.

Thanks
Best Regards

On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG  wrote:

> Hi,
>
> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out
> that YARN is killing the driver and executor process because of excessive
> use of memory. Here's something I tried:
>
> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
> extra memory is not used by heap.
> 2. I set the two memoryOverhead params to 1024 (default is 384), but the
> memory just keeps growing and then hits the limit.
> 3. This problem is not shown in low-throughput jobs, neither in standalone
> mode.
> 4. The test job just receives messages from Kafka, with batch interval of
> 1, do some filtering and aggregation, and then print to executor logs. So
> it's not some 3rd party library that causes the 'leak'.
>
> Spark 1.3 is built by myself, with correct hadoop versions.
>
> Any ideas will be appreciated.
>
> Thanks.
>
> --
> Jerry
>


Re: Is the executor number fixed during the lifetime of one app ?

2015-05-27 Thread canan chen
How does the dynamic allocation works ? I mean does it related
with parallelism of my RDD and how does driver know how many executor it
needs ?

On Wed, May 27, 2015 at 2:49 PM, Saisai Shao  wrote:

> It depends on how you use Spark, if you use Spark with Yarn and enable
> dynamic allocation, the number of executor is not fixed, will change
> dynamically according to the load.
>
> Thanks
> Jerry
>
> 2015-05-27 14:44 GMT+08:00 canan chen :
>
>> It seems the executor number is fixed for the standalone mode, not sure
>> other modes.
>>
>
>