Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread trackissue121
I had already tested query in Hive CLI and it works fine. Same query shows 
error in Spark SQL.

On May 29, 2015 4:14 AM, ayan guha  wrote:
>
> Probably a naive question: can you try the same in hive CLI and see if your 
> SQL is working? Looks like hive thing to me as spark is faithfully delegating 
> the query to hive.
>
> On 29 May 2015 03:22, "Abhishek Tripathi"  wrote:
>>
>> Hi ,
>> I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive 
>> compatibility so that I can run Spark sql and access temp table remotely.
>>
>> I used below command to build  Spark, it was build successful but when I 
>> tried to access Hive data from Spark sql, I get error.
>>
>> Thanks,
>> Abhi
>>
>> ---
>> mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive 
>> -Phive-thriftserver -DskipTests clean package
>>
>> [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
>> [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql 
>> SET spark.sql.hive.version=0.13.1
>> spark-sql> show tables;
>> sample_07 false
>> t1 false
>> Time taken: 3.901 seconds, Fetched 2 row(s)
>> spark-sql> select * from t1;
>> 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
>> java.lang.VerifyError: class 
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
>>  overrides final method 
>> getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>>
>>  
>>


Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Ashish Mukherjee
Hi Mohit,

Thanks for your reply.

If my use case is purely querying read-only data (no transaction
scenarios), at what scale is one of them a better option than the other? I
am aware that for scale which can be supported on a single node, VoltDB is
a better choice. However, when the scale grows to a clustered scenario,
which is the right engine at various degrees of scale?

Regards,
Ashish

On Fri, May 29, 2015 at 6:57 AM, Mohit Jaggi  wrote:

> I have used VoltDB and Spark. The use cases for the two are quite
> different. VoltDB is intended for transactions and also supports queries on
> the same(custom to voltdb) store. Spark(SQL) is NOT suitable for
> transactions; it is designed for querying immutable data (which may exist
> in several different forms of stores).
>
> > On May 28, 2015, at 7:48 AM, Ashish Mukherjee <
> ashish.mukher...@gmail.com> wrote:
> >
> > Hello,
> >
> > I was wondering if there is any documented comparison of SparkSQL with
> MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow
> queries to be run in a clustered environment. What is  the major
> differentiation?
> >
> > Regards,
> > Ashish
>
>


Registering Custom metrics [Spark-Streaming-monitoring]

2015-05-28 Thread Snehal Nagmote
Hello All,

I am using spark streaming 1.3 . I want to capture few custom metrics based
on accumulators, I followed somewhat similar to this approach ,

val instrumentation = new SparkInstrumentation("example.metrics")
  * val numReqs = sc.accumulator(0L)
  * instrumentation.source.registerDailyAccumulator(numReqs, "numReqs")
  * instrumentation.register()

https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23

After registering metrics via accumulator , I am not able to see values of
these metrics on sink. I tried console sink , but still no luck

Do I need to set any properties in metrics.conf to enable this custom
source?


Also I noticed accumulators are displayed only in the task stages , It is
difficult to identify which task/stage on ui , that information would be
available .

Is there a way accumulators can be displayed on overall Job stats page ?

Any pointers/examples to achieve this would be helpful , spark monitoring
documentation is not very helpful,

Thanks in advance,

- Snehal


Re: How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Nan Xiao
Hi Somnath,

Is there a step-by-step instruction about using Eclipse to develop
Spark application? I think many people need them. Thanks!
Best Regards
Nan Xiao


On Thu, May 28, 2015 at 3:15 PM, Somnath Pandeya
 wrote:
> Try scala eclipse plugin to eclipsify spark project and import spark as 
> eclipse project
>
> -Somnath
>
> -Original Message-
> From: Nan Xiao [mailto:xiaonan830...@gmail.com]
> Sent: Thursday, May 28, 2015 12:32 PM
> To: user@spark.apache.org
> Subject: How to use Eclipse on Windows to build Spark environment?
>
> Hi all,
>
> I want to use Eclipse on Windows to build Spark environment, but find the 
> reference 
> page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
> doesn't contain any guide about Eclipse.
>
> Could anyone give tutorials or links about how to using Eclipse on Windows to 
> build Spark environment? Thanks in advance!
>
> Best Regards
> Nan Xiao
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
>
>
>  CAUTION - Disclaimer *
> This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely
> for the use of the addressee(s). If you are not the intended recipient, please
> notify the sender by e-mail and delete the original message. Further, you are 
> not
> to copy, disclose, or distribute this e-mail or its contents to any other 
> person and
> any such actions are unlawful. This e-mail may contain viruses. Infosys has 
> taken
> every reasonable precaution to minimize this risk, but is not liable for any 
> damage
> you may sustain as a result of any virus in this e-mail. You should carry out 
> your
> own virus checks before opening the e-mail or attachment. Infosys reserves the
> right to monitor and review the content of all messages sent to or from this 
> e-mail
> address. Messages sent to or from this e-mail address may be stored on the
> Infosys e-mail system.
> ***INFOSYS End of Disclaimer INFOSYS***

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Mohit Jaggi
I have used VoltDB and Spark. The use cases for the two are quite different. 
VoltDB is intended for transactions and also supports queries on the 
same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it 
is designed for querying immutable data (which may exist in several different 
forms of stores).

> On May 28, 2015, at 7:48 AM, Ashish Mukherjee  
> wrote:
> 
> Hello,
> 
> I was wondering if there is any documented comparison of SparkSQL with 
> MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries 
> to be run in a clustered environment. What is  the major differentiation?
> 
> Regards,
> Ashish


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Twitter Streaming HTTP 401 Error

2015-05-28 Thread tracynj
I am working on the Databricks Reference applications, porting them to my
company's platform, and extending them to emit RDF. I have already gotten
them working with the extension on EC2, and have the Log Analyzer
application working on our platform. But the Twitter Language Classifier
application keeps getting an HTTP 401 Error. This error shows the following
message:
15/05/28 16:30:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for
stream 0: Restarting receiver with delay 2000ms: Error receiving tweets -
401:Authentication credentials (https://dev.twitter.com/pages/auth) were
missing or incorrect. Ensure that you have set valid consumer key/secret,
access token/secret, and the system clock is in sync.
\n\n\nError 401 Unauthorized


HTTP ERROR: 401

Problem accessing '/1.1/statuses/sample.json?stall_warnings=true'.
Reason:
Unauthorized



Relevant discussions can be found on the Internet at:
http://www.google.co.jp/search?q=d0031b0b or
http://www.google.co.jp/search?q=1db75513
TwitterException{exceptionCode=[d0031b0b-1db75513], statusCode=401,
message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=3.0.3}
at 
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:177)
at
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
at 
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
at 
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
at
twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:462)

15/05/28 16:30:13 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from akka.tcp://sparkExecutor@mesos-0020-n3:56412

As the message says, the 401 code can be thrown by either bad credentials,
or by unsynchronized clocks. However, I know my credentials work, as I've
tested them both on EC2, and directly on the OAuth Testing Tool on the
Twitter Application Manager. Likewise, all the nodes on our cluster are
running ntpd, so that shouldn't be the problem either. I did some looking at
the API for the codes, and saw that the 401 could also be thrown for any
calls to the now deprecated v1 API endpoints, but given that the Databricks
applications run correctly in other contexts, such as EC2, I also do not
think this is the problem. I am at a loss to know why I am getting this
error, and have run out of ideas and could use some help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Streaming-HTTP-401-Error-tp23080.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Which would imply that if there was a load manager type of service, it
could signal to the driver(s) that they need to acquiesce, i.e. process
what's at hand and terminate.  Then bring up a new machine, then restart
the driver(s)...  Same deal with removing machines from the cluster. Send a
signal for the drivers to pipe down and terminate, then restart them.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
>> Hi all,
>>
>> As the author of the dynamic allocation feature I can offer a few
>> insights here.
>>
>> Gerard's explanation was both correct and concise: dynamic allocation is
>> not intended to be used in Spark streaming at the moment (1.4 or before).
>> This is because of two things:
>>
>> (1) Number of receivers is necessarily fixed, and these are started in
>> executors. Since we need a receiver for each InputDStream, if we kill these
>> receivers we essentially stop the stream, which is not what we want. It
>> makes little sense to close and restart a stream the same way we kill and
>> relaunch executors.
>>
>> (2) Records come in every batch, and when there is data to process your
>> executors are not idle. If your idle timeout is less than the batch
>> duration, then you'll end up having to constantly kill and restart
>> executors. If your idle timeout is greater than the batch duration, then
>> you'll never kill executors.
>>
>> Long answer short, with Spark streaming there is currently no
>> straightforward way to scale the size of your cluster. I had a long
>> discussion with TD (Spark streaming lead) about what needs to be done to
>> provide some semblance of dynamic scaling to streaming applications, e.g.
>> take into account the batch queue instead. We came up with a few ideas that
>> I will not detail here, but we are looking into this and do intend to
>> support it in the near future.
>>
>> -Andrew
>>
>>
>>
>> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>>> – it will be your insurance policy against sys crashes due to memory leaks.
>>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>>> and of course resorting to disk from time to time (ie when there is no free
>>> RAM ) and taking a performance hit from that, BUT only until there is no
>>> free RAM
>>>
>>>
>>>
>>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>>> *Sent:* Thursday, May 28, 2015 2:34 PM
>>> *To:* Evo Eftimov
>>> *Cc:* Gerard Maas; spark users
>>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>>> sizes/rate of growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Evo, good points.
>>>
>>>
>>>
>>> On the dynamic resource allocation, I'm surmising this only works within
>>> a particular cluster setup.  So it improves the usage of current cluster
>>> resources but it doesn't make the cluster itself elastic. At least, that's
>>> my understanding.
>>>
>>>
>>>
>>> Memory + disk would be good and hopefully it'd take *huge* load on the
>>> system to start exhausting the disk space too.  I'd guess that falling onto
>>> disk will make things significantly slower due to the extra I/O.
>>>
>>>
>>>
>>> Perhaps we'll really want all of these elements eventually.  I think
>>> we'd want to start with memory only, keeping maxRate low enough not to
>>> overwhelm the consumers; implement the cluster autoscaling.  We might
>>> experiment with dynamic resource allocation before we get to implement the
>>> cluster autoscale.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>> wrote:
>>>
>>> You can also try Dynamic Resource Allocation
>>>
>>>
>>>
>>>
>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>
>>>
>>>
>>> Also re the Feedback Loop for automatic message consumption rate
>>> adjustment – there is a “dumb” solution option – simply set the storage
>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>> will prevent it from crashing and hence loosing them. Then some memory will
>>> get freed and it will resort back to RAM and so on and so forth
>>>
>>>
>>>
>>>
>>>
>>> Sent from Samsung Mobile
>>>
>>>  Original message 
>>>
>>> From: Evo Eftimov
>>>
>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>
>>> To: Dmitry Goldenberg
>>>
>>> Cc: Gerard Maas ,spark users
>>>
>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>> growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> You can always spin new boxes in the background and bring them into the
>>> cluster fold when fully operational and time that with job relaunch and
>>> param change
>>>
>>>
>>>
>>> Kafka offsets are mabaged automatically for 

Re: Batch aggregation by sliding window + join

2015-05-28 Thread ayan guha
Which version of spark? In 1.4 window queries will show up for these kind
of scenarios.

1 thing I can suggest is keep daily aggregates materialised and partioned
by key and sorted by key-day combination using repartitionandsort method.
It allows you to use custom partitioner and custom sorter.

Best
Ayan
On 29 May 2015 03:31, "igor.berman"  wrote:

> Hi,
> I have a batch daily job that computes daily aggregate of several counters
> represented by some object.
> After daily aggregation is done, I want to compute block of 3 days
> aggregation(3,7,30 etc)
> To do so I need to add new daily aggregation to the current block and then
> subtract from current block the daily aggregation of the last day within
> the
> current block(sliding window...)
> I've implemented it with something like:
>
> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
> All rdds are keyed by unique id(long). Each rdd is saved in avro files
> after
> the job finishes and loaded when job starts(on next day). baseBlockRdd is
> much larger than lastDay and newDay rdds(depends on the size of the block)
>
> Unfortunately the performance is not satisfactory due to many shuffles(I
> have parallelism etc) I was looking for the way to improve performance
> somehow, to make sure that one task "joins" same local keys without
> reshuffling baseBlockRdd(which is big) each time the job starts(see
> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
> so bottom line - how to join big rdd with smaller rdd without reshuffling
> big rdd over and over again?
> As soon as I've saved this big rdd and reloaded it from disk I want that
> every other rdd will be partitioned and collocated by the same
> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
> small
> rdds will be sent over network.
>
> Another idea I had  - somehow split baseBlock into 2 parts with filter by
> keys of small rdds and then join, however I'm not sure it's possible to
> implement this filter without join.
>
> any ideas would be appreciated,
> thanks in advance
> Igor
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


spark mlib variance analysis

2015-05-28 Thread rafac
I have a simple problem: 
i got mean number of people on one place by hour(time-series like), and now
i want to know if the weather condition have impact on the mean number.
I would do it with  variance analysis like anova in spss or analysing the
resultant regression model summary
How is it possible to do with apache spark mlib? 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-mlib-variance-analysis-tp23079.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread ayan guha
Probably a naive question: can you try the same in hive CLI and see if your
SQL is working? Looks like hive thing to me as spark is faithfully
delegating the query to hive.
On 29 May 2015 03:22, "Abhishek Tripathi"  wrote:

> Hi ,
> I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
> compatibility so that I can run Spark sql and access temp table remotely.
>
> I used below command to build  Spark, it was build successful but when I
> tried to access Hive data from Spark sql, I get error.
>
> Thanks,
> Abhi
>
> ---
> *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
> -Phive-thriftserver -DskipTests clean package*
>
> [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
> [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
> SET spark.sql.hive.version=0.13.1
> spark-sql> show tables;
> sample_07 false
> t1 false
> Time taken: 3.901 seconds, Fetched 2 row(s)
> spark-sql> select * from t1;
> 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
> java.lang.VerifyError: class
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
> overrides final method *getUnknownFields*
> .()Lcom/google/protobuf/UnknownFieldSet;
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>
>
>
>


UDF accessing hive struct array fails with buffer underflow from kryo

2015-05-28 Thread yluo
Hi all, I'm using Spark 1.3.1 with Hive 0.13.1. When running a UDF accessing
a hive struct array the query fails with:

Caused by: com.esotericsoftware.kryo.KryoException: Buffer underflow.
Serialization trace:
fieldName
(org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector$MyField)
fields
(org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector)
listElementObjectInspector
(org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector)
argStructArrayOI (com.groupon.hive.udf.filter.StructStringMemberFilterUDF)
at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
at com.esotericsoftware.kryo.io.Input.readAscii_slow(Input.java:580)
at com.esotericsoftware.kryo.io.Input.readAscii(Input.java:558)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:436)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
at
org.apache.hadoop.hive.ql.exec.Utilities.deserializeObjectByKryo(Utilities.java:918)
... 102 more

Anyone seen anything similar? argStructArrayOI is a Hive
ListObjectInspector. The field the argStructArrayOI is accessing looks like:

array>

The table is a hive table.

Running the same query on Hive works... what's going on here? Any
suggestions on how to debug this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UDF-accessing-hive-struct-array-fails-with-buffer-underflow-from-kryo-tp23078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark java.io.FileNotFoundException: /user/spark/applicationHistory/application

2015-05-28 Thread roy
hi,

 Suddenly spark jobs started failing with following error

Exception in thread "main" java.io.FileNotFoundException:
/user/spark/applicationHistory/application_1432824195832_1275.inprogress (No
such file or directory)

full trace here

[21:50:04 x...@hadoop-client01.dev:~]$ spark-submit --class
org.apache.spark.examples.SparkPi --master yarn
/usr/lib/spark/lib/spark-examples.jar 10
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/05/28 21:55:21 INFO SparkContext: Running Spark version 1.3.0
15/05/28 21:55:21 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/05/28 21:55:21 WARN SparkConf:
SPARK_JAVA_OPTS was detected (set to ' -Dspark.local.dir=/srv/tmp/xyz ').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with conf/spark-defaults.conf to set defaults for an
application
 - ./spark-submit with --driver-java-options to set -X options for a driver
 - spark.executor.extraJavaOptions to set -X options for executors
 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master
or worker)

15/05/28 21:55:21 WARN SparkConf: Setting 'spark.executor.extraJavaOptions'
to ' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around.
15/05/28 21:55:21 WARN SparkConf: Setting 'spark.driver.extraJavaOptions' to
' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around.
15/05/28 21:55:22 INFO SecurityManager: Changing view acls to: xyz
15/05/28 21:55:22 INFO SecurityManager: Changing modify acls to: xyz
15/05/28 21:55:22 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(xyz); users
with modify permissions: Set(xyz)
15/05/28 21:55:22 INFO Slf4jLogger: Slf4jLogger started
15/05/28 21:55:22 INFO Remoting: Starting remoting
15/05/28 21:55:22 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkdri...@hadoop-client01.abc.com:51876]
15/05/28 21:55:22 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkdri...@hadoop-client01.abc.com:51876]
15/05/28 21:55:22 INFO Utils: Successfully started service 'sparkDriver' on
port 51876.
15/05/28 21:55:22 INFO SparkEnv: Registering MapOutputTracker
15/05/28 21:55:22 INFO SparkEnv: Registering BlockManagerMaster
15/05/28 21:55:22 INFO DiskBlockManager: Created local directory at
/srv/tmp/xyz/spark-1e66e6eb-7ad6-4f62-87fc-f0cfaa631e36/blockmgr-61f866b8-6475-4a11-88b2-792d2ba22662
15/05/28 21:55:22 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
15/05/28 21:55:23 INFO HttpFileServer: HTTP File server directory is
/srv/tmp/xyz/spark-2b676170-3f88-44bf-87a3-600de1b7ee24/httpd-b84f76d5-26c7-4c63-9223-f6c5aa3899f0
15/05/28 21:55:23 INFO HttpServer: Starting HTTP Server
15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/28 21:55:23 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:41538
15/05/28 21:55:23 INFO Utils: Successfully started service 'HTTP file
server' on port 41538.
15/05/28 21:55:23 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/28 21:55:23 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/05/28 21:55:23 INFO Utils: Successfully started service 'SparkUI' on port
4040.
15/05/28 21:55:23 INFO SparkUI: Started SparkUI at
http://hadoop-client01.abc.com:4040
15/05/28 21:55:23 INFO SparkContext: Added JAR
file:/usr/lib/spark/lib/spark-examples.jar at
http://10.0.3.62:41538/jars/spark-examples.jar with timestamp 1432850123523
15/05/28 21:55:24 INFO Client: Requesting a new application from cluster
with 16 NodeManagers
15/05/28 21:55:24 INFO Client: Verifying our application has not requested
more than the maximum memory capability of the cluster (61440 MB per
container)
15/05/28 21:55:24 INFO Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
15/05/28 21:55:24 INFO Client: Setting up container launch context for our
AM
15/05/28 21:55:24 INFO Client: Preparing resources for our AM container
15/05/28 21:55:24 INFO Client: Setting up the launch environment for our AM
container
15/05/28 21:55:24 INFO SecurityManager: Changing view acls to: xyz
15/05/28 21:55:24 INFO SecurityManager: Changing modify acls to: xyz
15/05/28 21:55:24 INFO SecurityManager: SecurityManager: au

Adding an indexed column

2015-05-28 Thread Cesar Flores
Assuming that I have the next data frame:

flag | price
--
1|47.808764653746
1|47.808764653746
1|31.9869279512204
1|47.7907893713564
1|16.7599200038239
1|16.7599200038239
1|20.3916014172137

How can I create a data frame with an extra indexed column as the next one:

flag | price  | index
--|---
1|47.808764653746 | 0
1|47.808764653746 | 1
1|31.9869279512204| 2
1|47.7907893713564| 3
1|16.7599200038239| 4
1|16.7599200038239| 5
1|20.3916014172137| 6

-- 
Cesar Flores


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Cody Koeninger
I'm not sure that points 1 and 2 really apply to the kafka direct stream.
There are no receivers, and you know at the driver how big each of your
batches is.

On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Also re the Feedback Loop for automatic message consumption rate
>> adjustment – there is a “dumb” solution option – simply set the storage
>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>> exhausted spark streaming will resort to keeping new RDDs on disk which
>> will prevent it from crashing and hence loosing them. Then some memory will
>> get freed and it will resort back to RAM and so on and so forth
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>  Original message 
>>
>> From: Evo Eftimov
>>
>> Date:2015/05/28 13:22 (GMT+00:00)
>>
>> To: Dmitry Goldenberg
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> You can always spin new boxes in the background and bring them into the
>> cluster fold when fully operational and time that with job relaunch and
>> param change
>>
>>
>>
>> Kafka offsets are mabaged automatically for you by the kafka clients
>> which keep them in zoomeeper dont worry about that ad long as you shut down
>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>> deal if necessary
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>
>>
>>  Original message 
>>
>> From: Dmitry Goldenberg
>>
>> Date:2015/05/28 13:16 (GMT+00:00)
>>
>> To: Evo Eftimov
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> Thanks, Evo.  Pe

Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-28 Thread Sanjay Subramanian
ok guys , finally figured out how to get it running. I have detailed out the 
steps I did. Perhaps its clear to all you folks. To me it was not :-) Our 
Hadoop development environment   
   - 3 node development hadoop cluster
   - Current version CDH 5.3.3
   - Hive 0.13.1
   - Spark 1.2.0 (standalone mode)  
  - node1 (worker1, master)
  - node2 (worker2)
  - node3 (worker3)

   - Cloudera Manager to manage and update(using parcels)
Steps to get spark-sql running   
   - On every node(node1, node2, node3 above)  
  - sudo cp -avi /etc/hive/conf/hive-site.xml /etc/spark/conf

   - Edit and add a line  
  - sudo vi /opt/cloudera/parcels/CDH/lib/spark/bin/compute-classpath.sh
 
 - # added by sanjay for running Spark using hive metadata
 - CLASSPATH="$CLASSPATH:/opt/cloudera/parcels/CDH/lib/hive/lib/*"


   - Run spark SQL in CLI mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql

   - Run spark SQL in async mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -e "select * from 
band.beatles where upper(first_name) like '%GEORGE%' "

   - Run spark SQL in "SQL File" mode  
  - /opt/cloudera/parcels/CDH/lib/spark/bin/spark-sql -f   get_names.hql


  From: Andrew Otto 
 To: Sanjay Subramanian  
Cc: user  
 Sent: Thursday, May 28, 2015 7:26 AM
 Subject: Re: Pointing SparkSQL to existing Hive Metadata with data file 
locations in HDFS
   

val sqlContext = new HiveContext(sc)val schemaRdd = sqlContext.sql("some 
complex SQL")

It mostly works, but have been having issues with tables that contains a large 
amount of data:
https://issues.apache.org/jira/browse/SPARK-6910




On May 27, 2015, at 20:52, Sanjay Subramanian 
 wrote:
hey guys
On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
there are about 300+ hive tables.The data is stored an text (moving slowly to 
Parquet) on HDFS.I want to use SparkSQL and point to the Hive metadata and be 
able to define JOINS etc using a programming structure like this 
import org.apache.spark.sql.hive.HiveContextval sqlContext = new 
HiveContext(sc)val schemaRdd = sqlContext.sql("some complex SQL")

Is that the way to go ? Some guidance will be great.
thanks
sanjay 






  

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Andrew.

>From speaking with customers, this is one of the most pressing issues for
them (burning hot, to be precise), especially in a SAAS type of environment
and especially with commodity hardware at play. Understandably, folks don't
want to pay for more hardware usage than necessary and they want to be able
to handle the peaks and valleys of usage (especially the peaks) optimally.

It looks like there needs to be a generic 'watchdog' type of service which
would get metrics/signals from things like Kafka, then call into a
(potentially custom) handler which will cause new hardware to be
provisioned or decomissioned.  Needless to say, both Spark, the watchdog,
and the provisioner need to be completely in sync and mindful of currently
running Spark jobs so that new hardware immediately picks up extra load and
hardware is only decommissioned as any running Spark jobs have been
acquiesced...

As I learn more about the configuration parameters and dynamic resource
allocation, I'm starting to feel that a dashboard with all these knobs
exposed would be so useful. Being able to test/simulate load volumes and
tweak the knobs as necessary, to arrive at the optimal patterns...

Regards,
- Dmitry

On Thu, May 28, 2015 at 3:21 PM, Andrew Or  wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Also re the Feedback Loop for automatic message consumption rate
>> adjustment – there is a “dumb” solution option – simply set the storage
>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>> exhausted spark streaming will resort to keeping new RDDs on disk which
>> will prevent it from crashing and hence loosing them. Then some memory will
>> get freed and it will resort back to RAM and so on and so forth
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>  

spark submit debugging

2015-05-28 Thread boci
Hi!

I have a little problem... If I started my spark application as java app
(locally) it's work like a charm, but if I start in hadoop cluster (tried
spark-submit --master local[5] and --master yarn-client), but it's not
working. No error, no exception, periodically run the job but nothing
happened.

(I using docker container)

Thanks

b0c1
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
Thanks Sandy- I was digging through the code in the deploy.yarn.Client and
literally found that property right before I saw your reply. I'm on 1.2.x
right now which doesn't have the property. I guess I need to update sooner
rather than later.

On Thu, May 28, 2015 at 3:56 PM, Sandy Ryza  wrote:

> Hi Corey,
>
> As of this PR https://github.com/apache/spark/pull/5297/files, this can
> be controlled with spark.yarn.submit.waitAppCompletion.
>
> -Sandy
>
> On Thu, May 28, 2015 at 11:48 AM, Corey Nolet  wrote:
>
>> I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
>> noticing the jvm that fires up to allocate the resources, etc... is not
>> going away after the application master and executors have been allocated.
>> Instead, it just sits there printing 1 second status updates to the
>> console. If I kill it, my job still runs (as expected).
>>
>> Is there an intended way to stop this from happening and just have the
>> local JVM die when it's done allocating the resources and deploying the
>> application master?
>>
>
>


Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Sandy Ryza
Hi Corey,

As of this PR https://github.com/apache/spark/pull/5297/files, this can be
controlled with spark.yarn.submit.waitAppCompletion.

-Sandy

On Thu, May 28, 2015 at 11:48 AM, Corey Nolet  wrote:

> I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
> noticing the jvm that fires up to allocate the resources, etc... is not
> going away after the application master and executors have been allocated.
> Instead, it just sits there printing 1 second status updates to the
> console. If I kill it, my job still runs (as expected).
>
> Is there an intended way to stop this from happening and just have the
> local JVM die when it's done allocating the resources and deploying the
> application master?
>


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Andrew Or
Hi all,

As the author of the dynamic allocation feature I can offer a few insights
here.

Gerard's explanation was both correct and concise: dynamic allocation is
not intended to be used in Spark streaming at the moment (1.4 or before).
This is because of two things:

(1) Number of receivers is necessarily fixed, and these are started in
executors. Since we need a receiver for each InputDStream, if we kill these
receivers we essentially stop the stream, which is not what we want. It
makes little sense to close and restart a stream the same way we kill and
relaunch executors.

(2) Records come in every batch, and when there is data to process your
executors are not idle. If your idle timeout is less than the batch
duration, then you'll end up having to constantly kill and restart
executors. If your idle timeout is greater than the batch duration, then
you'll never kill executors.

Long answer short, with Spark streaming there is currently no
straightforward way to scale the size of your cluster. I had a long
discussion with TD (Spark streaming lead) about what needs to be done to
provide some semblance of dynamic scaling to streaming applications, e.g.
take into account the batch queue instead. We came up with a few ideas that
I will not detail here, but we are looking into this and do intend to
support it in the near future.

-Andrew



2015-05-28 8:02 GMT-07:00 Evo Eftimov :

> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
> – it will be your insurance policy against sys crashes due to memory leaks.
> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
> and of course resorting to disk from time to time (ie when there is no free
> RAM ) and taking a performance hit from that, BUT only until there is no
> free RAM
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Thursday, May 28, 2015 2:34 PM
> *To:* Evo Eftimov
> *Cc:* Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo, good points.
>
>
>
> On the dynamic resource allocation, I'm surmising this only works within a
> particular cluster setup.  So it improves the usage of current cluster
> resources but it doesn't make the cluster itself elastic. At least, that's
> my understanding.
>
>
>
> Memory + disk would be good and hopefully it'd take *huge* load on the
> system to start exhausting the disk space too.  I'd guess that falling onto
> disk will make things significantly slower due to the extra I/O.
>
>
>
> Perhaps we'll really want all of these elements eventually.  I think we'd
> want to start with memory only, keeping maxRate low enough not to overwhelm
> the consumers; implement the cluster autoscaling.  We might experiment with
> dynamic resource allocation before we get to implement the cluster
> autoscale.
>
>
>
>
>
>
>
> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
> wrote:
>
> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
>  Original message 
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the j

RE: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Evo Eftimov
I don’t think the number of CPU cores controls the “number of parallel tasks”. 
The number of Tasks corresponds first and foremost to the number of (Dstream) 
RDD Partitions  

 

The Spark documentation doesn’t mention what is meant by “Task” in terms of 
Standard Multithreading Terminology ie a Thread or Process so your point is 
good 

 

Ps: time and time again every product and dev team and company invent their own 
terminology so 50% of the time using the product is spent on deciphering and 
reinventing the wheel 

 

From: Mulugeta Mammo [mailto:mulugeta.abe...@gmail.com] 
Sent: Thursday, May 28, 2015 7:24 PM
To: Ruslan Dautkhanov
Cc: user
Subject: Re: Value for SPARK_EXECUTOR_CORES

 

Thanks for the valuable information. The blog states:

 

"The cores property controls the number of concurrent tasks an executor can 
run. --executor-cores 5 means that each executor can run a maximum of five 
tasks at the same time. " 

 

So, I guess the max number of executor-cores I can assign is the CPU count 
(which includes the number of threads per core), not just the number of cores. 
I just want to be sure the "cores" term Spark is using.

 

Thanks

 

On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov  
wrote:

It's not only about cores. Keep in mind spark.executor.cores also affects 
available memeory for each task:

 

>From 
>http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

 

The memory available to each task is (spark.executor.memory * 
spark.shuffle.memoryFraction 
*spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety 
fraction default to 0.2 and 0.8 respectively.




I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run 
faster..

 


-- 
Ruslan Dautkhanov

 

On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo  
wrote:

My executor has the following spec (lscpu):

 

CPU(s): 16

Core(s) per socket: 4

Socket(s): 2

Thread(s) per code: 2

 

The CPU count is obviously 4*2*2 = 16. My question is what value is Spark 
expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 
2 = 4) ?

 

Thanks

 

 



Add Custom Aggregate Column to Spark DataFrame

2015-05-28 Thread calstad
I have a Spark DataFrame that looks like:

| id | value | bin |
|+---+-|
|  1 |   3.4 |   2 |
|  2 |   2.6 |   1 |
|  3 |   1.8 |   1 |
|  4 |   9.6 |   2 |

I have a function `f` that takes an array of values and returns a number.  I
want to add a column to the above DataFrame where the value for the new
column in each row is the value of `f` for all the `value` entries that have
the same `bin` entry, i.e:

| id | value | bin | f_value   |
|+---+-+---|
|  1 |   3.4 |   2 | f([3.4, 9.6]) |
|  2 |   2.6 |   1 | f([2.6, 1.8]) |
|  3 |   1.8 |   1 | f([2.6, 1.8]) |
|  4 |   9.6 |   2 | f([3.4, 9.6]) |

Since I need to aggregate all `value`s per `bin`, I cannot use the
`withColumn` function to add this new column.  What is the best way to do
this until user defined aggregation functions make there way into Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Add-Custom-Aggregate-Column-to-Spark-DataFrame-tp23075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
noticing the jvm that fires up to allocate the resources, etc... is not
going away after the application master and executors have been allocated.
Instead, it just sits there printing 1 second status updates to the
console. If I kill it, my job still runs (as expected).

Is there an intended way to stop this from happening and just have the
local JVM die when it's done allocating the resources and deploying the
application master?


Re: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Mulugeta Mammo
Thanks for the valuable information. The blog states:

"The cores property controls the number of concurrent tasks an executor can
run. --executor-cores 5 means that each executor can run a maximum of five
tasks at the same time. "

So, I guess the max number of executor-cores I can assign is the CPU count
(which includes the number of threads per core), not just the number of
cores. I just want to be sure the "cores" term Spark is using.

Thanks

On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov 
wrote:

> It's not only about cores. Keep in mind spark.executor.cores also affects
> available memeory for each task:
>
> From
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
>
> The memory available to each task is (spark.executor.memory *
> spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/
> spark.executor.cores. Memory fraction and safety fraction default to 0.2
> and 0.8 respectively.
>
> I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your
> job run faster..
>
>
> --
> Ruslan Dautkhanov
>
> On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo  > wrote:
>
>> My executor has the following spec (lscpu):
>>
>> CPU(s): 16
>> Core(s) per socket: 4
>> Socket(s): 2
>> Thread(s) per code: 2
>>
>> The CPU count is obviously 4*2*2 = 16. My question is what value is Spark
>> expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores
>> (2 * 2 = 4) ?
>>
>> Thanks
>>
>
>


spark sql lateral view unresolved attribute exception

2015-05-28 Thread weoccc
Hi,

It seems LATERVAL VIEW explode column named 'some_col' can't be resolved if
expressed in subquery. Any idea why ?

SELECT `fc_clickq`.`some_col` FROM ( SELECT *  FROM fc_clickq LATERAL VIEW
explode(`overlap`) ltr_table_3 AS `some_col`) fc_clickq ;

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'fc_clickq.some_col, tree:

'Project ['fc_clickq.some_col]

 Subquery fc_clickq

  Project [..,`some_col`#180]

   Generate explode(overlap#170), true, false, Some(ltr_table_3)

MetastoreRelation default, fc_clickq, None

Thx a lot,

Weide


Re: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Ruslan Dautkhanov
It's not only about cores. Keep in mind spark.executor.cores also affects
available memeory for each task:

From
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

The memory available to each task is (spark.executor.memory *
spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/
spark.executor.cores. Memory fraction and safety fraction default to 0.2
and 0.8 respectively.

I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job
run faster..


-- 
Ruslan Dautkhanov

On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo 
wrote:

> My executor has the following spec (lscpu):
>
> CPU(s): 16
> Core(s) per socket: 4
> Socket(s): 2
> Thread(s) per code: 2
>
> The CPU count is obviously 4*2*2 = 16. My question is what value is Spark
> expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores
> (2 * 2 = 4) ?
>
> Thanks
>


Re: PySpark with OpenCV causes python worker to crash

2015-05-28 Thread Davies Liu
Could you try to comment out some lines in
`extract_sift_features_opencv` to find which line cause the crash?

If the bytes came from sequenceFile() is broken, it's easy to crash a
C library in Python (OpenCV).

On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga  wrote:
> Hi sparkers,
>
> I am working on a PySpark application which uses the OpenCV library. It runs
> fine when running the code locally but when I try to run it on Spark on the
> same Machine it crashes the worker.
>
> The code can be found here:
> https://gist.github.com/samos123/885f9fe87c8fa5abf78f
>
> This is the error message taken from STDERR of the worker log:
> https://gist.github.com/samos123/3300191684aee7fc8013
>
> Would like pointers or tips on how to debug further? Would be nice to know
> the reason why the worker crashed.
>
> Thanks,
> Sam Stoelinga
>
>
> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readInt(DataInputStream.java:392)
> at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: [Streaming] Configure executor logging on Mesos

2015-05-28 Thread Tim Chen
-- Forwarded message --
From: Tim Chen 
Date: Thu, May 28, 2015 at 10:49 AM
Subject: Re: [Streaming] Configure executor logging on Mesos
To: Gerard Maas 


Hi Gerard,

The log line you referred to is not Spark logging but Mesos own logging,
which is using glog.

Our own executor logs should only contain very few lines though.

Most of the log lines you'll see is from Spark, and it can be controled by
specifiying a log4j.properties to be downloaded with your Mesos task.
Alternatively if you are downloading Spark executor via spark.executor.uri,
you can include log4j.properties in that tar ball.

I think we probably need some more configurations for Spark scheduler to
pick up extra files to be downloaded into the sandbox.

Tim





On Thu, May 28, 2015 at 6:46 AM, Gerard Maas  wrote:

> Hi,
>
> I'm trying to control the verbosity of the logs on the Mesos executors
> with no luck so far. The default behaviour is INFO on stderr dump with an
> unbounded growth that gets too big at some point.
>
> I noticed that when the executor is instantiated, it locates a default log
> configuration in the spark assembly:
>
> I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
> 20150528-063307-780930314-5050-8152-S5
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
>
> So, no matter what I provide in my job jar files (or also tried with
> (spark.executor.extraClassPath=log4j.properties) takes effect in the
> executor's configuration.
>
> How should I configure the log on the executors?
>
> thanks, Gerard.
>


Hyperthreading

2015-05-28 Thread Mulugeta Mammo
Hi guys,

Does the SPARK_EXECUTOR_CORES assume Hyper threading? For example, if I
have 4 cores with 2 threads per core, should the SPARK_EXECUTOR_CORES be
4*2 = 8 or just 4?

Thanks,


Batch aggregation by sliding window + join

2015-05-28 Thread igor.berman
Hi,
I have a batch daily job that computes daily aggregate of several counters
represented by some object.
After daily aggregation is done, I want to compute block of 3 days
aggregation(3,7,30 etc)
To do so I need to add new daily aggregation to the current block and then
subtract from current block the daily aggregation of the last day within the
current block(sliding window...)
I've implemented it with something like:
baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
All rdds are keyed by unique id(long). Each rdd is saved in avro files after
the job finishes and loaded when job starts(on next day). baseBlockRdd is
much larger than lastDay and newDay rdds(depends on the size of the block)

Unfortunately the performance is not satisfactory due to many shuffles(I
have parallelism etc) I was looking for the way to improve performance
somehow, to make sure that one task "joins" same local keys without
reshuffling baseBlockRdd(which is big) each time the job starts(see
https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
so bottom line - how to join big rdd with smaller rdd without reshuffling
big rdd over and over again?
As soon as I've saved this big rdd and reloaded it from disk I want that
every other rdd will be partitioned and collocated by the same
"partitioner"(which is absent for hadooprdd) ... somehow, so that only small
rdds will be sent over network.

Another idea I had  - somehow split baseBlock into 2 parts with filter by
keys of small rdds and then join, however I'm not sure it's possible to
implement this filter without join.

any ideas would be appreciated,
thanks in advance
Igor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread Abhishek Tripathi
Hi ,
I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
compatibility so that I can run Spark sql and access temp table remotely.

I used below command to build  Spark, it was build successful but when I
tried to access Hive data from Spark sql, I get error.

Thanks,
Abhi

---
*mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
-Phive-thriftserver -DskipTests clean package*

[cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
[cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
SET spark.sql.hive.version=0.13.1
spark-sql> show tables;
sample_07 false
t1 false
Time taken: 3.901 seconds, Fetched 2 row(s)
spark-sql> select * from t1;
15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
java.lang.VerifyError: class
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
overrides final method *getUnknownFields*
.()Lcom/google/protobuf/UnknownFieldSet;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)


PySpark with OpenCV causes python worker to crash

2015-05-28 Thread Sam Stoelinga
Hi sparkers,

I am working on a PySpark application which uses the OpenCV library. It
runs fine when running the code locally but when I try to run it on Spark
on the same Machine it crashes the worker.

The code can be found here:
https://gist.github.com/samos123/885f9fe87c8fa5abf78f

This is the error message taken from STDERR of the worker log:
https://gist.github.com/samos123/3300191684aee7fc8013

Would like pointers or tips on how to debug further? Would be nice to know
the reason why the worker crashed.

Thanks,
Sam Stoelinga


org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)


Loading CSV to DataFrame and saving it into Parquet for speedup

2015-05-28 Thread M Rez
I am using Spark-CSV  to load a 50GB of around 10,000 CSV files into couple
of unified DataFrames. Since this process is slow I have wrote this snippet: 

targetList.foreach { target =>
// this is using sqlContext.load by getting list of files then
loading them according to schema files that
// read before and built their StructType
getTrace(target, sqlContext)
  .reduce(_ unionAll _)
  .registerTempTable(target.toUpperCase())
sqlContext.sql("SELECT * FROM " + target.toUpperCase())
  .saveAsParquetFile(processedTraces + target)

to load the csv files and then union all the cvs files with the same schema
and write them into a single parquet file with their parts. The problems is
my cpu (not all cpus are being busy) and disk (ssd, with 1MB/s at most) are
barely utilized. I wonder what am I doing wrong?!

snippet for getTrace: 

def getTrace(target: String, sqlContext: SQLContext): Seq[DataFrame] = {
logFiles(mainLogFolder + target).map {
  file =>
sqlContext.load(
  driver,
  // schemaSelect builds the StructType once
  schemaSelect(schemaFile, target, sqlContext),
  Map("path" -> file, "header" -> "false", "delimiter" -> ","))
}
  }

thanks for any help




-
regards,
mohamad
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-CSV-to-DataFrame-and-saving-it-into-Parquet-for-speedup-tp23071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: debug jsonRDD problem?

2015-05-28 Thread Michael Stone

On Wed, May 27, 2015 at 02:06:16PM -0700, Ted Yu wrote:

Looks like the exception was caused by resolved.get(prefix ++ a) returning None
:
        a => StructField(a.head, resolved.get(prefix ++ a).get, nullable =
true)

There are three occurrences of resolved.get() in createSchema() - None should
be better handled in these places.

My two cents.


Here's the simplest test case I've come up with:

sqlContext.jsonRDD(sc.parallelize(Array("{\"'```'\":\"\"}"))).count()

Mike Stone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk will 
make things significantly slower due to the extra I/O.

 

Perhaps we'll really want all of these elements eventually.  I think we'd want 
to start with memory only, keeping maxRate low enough not to overwhelm the 
consumers; implement the cluster autoscaling.  We might experiment with dynamic 
resource allocation before we get to implement the cluster autoscale.

 

 

 

On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can ev

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

Unfortunately, they're still growing, both driver and executors.

I run the same job with local mode, everything is fine.

On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
wrote:

> Can you replace your counting part with this?
>
> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>
>
>
> Thanks
> Best Regards
>
> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG  wrote:
>
>> Hi,
>>
>> I wrote a simple test job, it only does very basic operations. for
>> example:
>>
>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
>> -> 1)).map(_._2)
>> val logs = lines.flatMap { line =>
>>   try {
>> Some(parse(line).extract[Impression])
>>   } catch {
>> case _: Exception => None
>>   }
>> }
>>
>> logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>   rdd.foreachPartition { iter =>
>> iter.foreach(count => logger.info(count.toString))
>>   }
>> }
>>
>> It receives messages from Kafka, parse the json, filter and count the
>> records, and then print it to logs.
>>
>> Thanks.
>>
>>
>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das 
>> wrote:
>>
>>> Hi Zhang,
>>>
>>> Could you paste your code in a gist? Not sure what you are doing inside
>>> the code to fill up memory.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG  wrote:
>>>
 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das >>> > wrote:

> Are you using the createStream or createDirectStream api? If its the
> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
> slow things down though). Another way would be to try the later one.
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:
>
>> Hi Akhil,
>>
>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>> maybe it is, but how to tell?
>>
>> I googled about how to check the off-heap memory usage, there's a
>> tool called pmap, but I don't know how to interprete the results.
>>
>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> After submitting the job, if you do a ps aux | grep spark-submit
>>> then you can see all JVM params. Are you using the highlevel consumer
>>> (receiver based) for receiving data from Kafka? In that case if your
>>> throughput is high and the processing delay exceeds batch interval then 
>>> you
>>> will hit this memory issues as the data will keep on receiving and is
>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it 
>>> slows
>>> things down). Another alternate will be to use the lowlevel kafka
>>> consumer  or to
>>> use the non-receiver based directStream
>>> 
>>> that comes up with spark.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG 
>>> wrote:
>>>
 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because 
 of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
 the extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384),
 but the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 
 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry

>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


 --
 Jerry

>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry


Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Ashish Mukherjee
Hello,

I was wondering if there is any documented comparison of SparkSQL with
MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow
queries to be run in a clustered environment. What is  the major
differentiation?

Regards,
Ashish


Best practice to update a MongoDB document from Sparks

2015-05-28 Thread nibiau
Hello,
I'm evaluating Spark/SparkStreaming .
I use SparkStreaming to receive messages from a Kafka topic.
As soon as I have a JavaReceiverInputDStream , I have to treat each message, 
for each one I have to search in MongoDB to find if a document does exist.
If I found the document I have to update it with some values , else I have to 
create a new one.
I success to create a new document by using saveAsNewAPIHadoopFiles , but I 
don't know the best practice to update an existing document from Spark.
In a basic Java program I would use :
   myColl.update(updateQuery, updateCommand);

So, do I have to use this same way to update the document, skip the message and 
let saveAsNewAPIHadoopFiles in order to create the ones to be created ?
Or any other better way ?

Tks
Nicolas 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-28 Thread Andrew Otto
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")


It mostly works, but have been having issues with tables that contains a large 
amount of data:

https://issues.apache.org/jira/browse/SPARK-6910 



> On May 27, 2015, at 20:52, Sanjay Subramanian 
>  wrote:
> 
> hey guys
> 
> On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
> there are about 300+ hive tables.
> The data is stored an text (moving slowly to Parquet) on HDFS.
> I want to use SparkSQL and point to the Hive metadata and be able to define 
> JOINS etc using a programming structure like this 
> 
> import org.apache.spark.sql.hive.HiveContext
> val sqlContext = new HiveContext(sc)
> val schemaRdd = sqlContext.sql("some complex SQL")
> 
> 
> Is that the way to go ? Some guidance will be great.
> 
> thanks
> 
> sanjay
> 
> 
> 



Re: Dataframe Partitioning

2015-05-28 Thread Silvio Fiorito
That’s due to the config setting spark.sql.shuffle.partitions which defaults to 
200


From: Masf
Date: Thursday, May 28, 2015 at 10:02 AM
To: "user@spark.apache.org"
Subject: Dataframe Partitioning

Hi.

I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner 
join between these dataframes, the result contains 200 partitions. Why?

df1.join(df2, df1("id") === df2("id"), "Inner") => returns 200 partitions


Thanks!!!
--


Regards.
Miguel Ángel


Dataframe Partitioning

2015-05-28 Thread Masf
Hi.

I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner
join between these dataframes, the result contains 200 partitions. *Why?*

df1.join(df2, df1("id") === df2("id"), "Inner") => returns 200 partitions


Thanks!!!
-- 


Regards.
Miguel Ángel


[Streaming] Configure executor logging on Mesos

2015-05-28 Thread Gerard Maas
Hi,

I'm trying to control the verbosity of the logs on the Mesos executors with
no luck so far. The default behaviour is INFO on stderr dump with an
unbounded growth that gets too big at some point.

I noticed that when the executor is instantiated, it locates a default log
configuration in the spark assembly:

I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
20150528-063307-780930314-5050-8152-S5
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

So, no matter what I provide in my job jar files (or also tried with
(spark.executor.extraClassPath=log4j.properties) takes effect in the
executor's configuration.

How should I configure the log on the executors?

thanks, Gerard.


Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
The oproblem lies the way you are doing the processing.

After the g.foreach(x => {println(x); println("")}) are you
doing ssc.start. It means till now what you did is just setup the
computation stpes but spark has not started any real processing. so when
you do g.foreach what it iterates over is the empyt list yopu are returning
from the g method hence it does not print anything

On Thu, May 28, 2015 at 7:02 PM, Animesh Baranawal <
animeshbarana...@gmail.com> wrote:

> I also started the streaming context by running ssc.start() but still
> apart from logs nothing of g gets printed.
>
> -- Forwarded message --
> From: Animesh Baranawal 
> Date: Thu, May 28, 2015 at 6:57 PM
> Subject: SPARK STREAMING PROBLEM
> To: user@spark.apache.org
>
>
> Hi,
>
> I am trying to extract the filenames from which a Dstream is generated by
> parsing the toDebugString method on RDD
> I am implementing the following code in spark-shell:
>
> import org.apache.spark.streaming.{StreamingContext, Seconds}
> val ssc = new StreamingContext(sc,Seconds(10))
> val lines = ssc.textFileStream(// directory //)
>
> def g : List[String] = {
>var res = List[String]()
>lines.foreachRDD{ rdd => {
>   if(rdd.count > 0){
>   val files = rdd.toDebugString.split("\n").filter(_.contains(":\"))
>   files.foreach{ ms => {
>  res = ms.split(" ")(2)::res
>   }}   }
>}}
>res
> }
>
> g.foreach(x => {println(x); println("")})
>
> However when I run the code, nothing gets printed on the console apart
> from the logs. Am I doing something wrong?
> And is there any better way to extract the file names from DStream ?
>
> Thanks in advance
>
>
> Animesh
>
>
>


-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Soft distinct on data frames.

2015-05-28 Thread Jan-Paul Bultmann
Hey,
Is there a way to do a distinct operation on each partition only?
My program generates quite a few duplicate tuples and it would be nice to 
remove some of these as an optimisation
without having to reshuffle the data.

I’ve also noticed that plans generated with an unique transformation have this 
peculiar form:

== Physical Plan ==
Distinct false
 Exchange (HashPartitioning [_0#347L,_1#348L], 200)
  Distinct true
   PhysicalRDD [_0#347L,_1#348L], MapPartitionsRDD[247] at map at 
SQLContext.scala:394

Does this mean that set semantics are just a flag that can be turned off and on 
for each shuffling operation?
If so, is it possible to do so in general, so that one always uses set 
semantics instead of bag?
Or will the optimiser try to propagate the set semantics?

Cheers Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Evo, good points.

On the dynamic resource allocation, I'm surmising this only works within a
particular cluster setup.  So it improves the usage of current cluster
resources but it doesn't make the cluster itself elastic. At least, that's
my understanding.

Memory + disk would be good and hopefully it'd take *huge* load on the
system to start exhausting the disk space too.  I'd guess that falling onto
disk will make things significantly slower due to the extra I/O.

Perhaps we'll really want all of these elements eventually.  I think we'd
want to start with memory only, keeping maxRate low enough not to overwhelm
the consumers; implement the cluster autoscaling.  We might experiment with
dynamic resource allocation before we get to implement the cluster
autoscale.



On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
>  Original message 
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Lea

Spark streaming with kafka

2015-05-28 Thread boci
Hi guys,

I using spark streaming with kafka... In local machine (start as java
application without using spark-submit) it's work, connect to kafka and do
the job (*). I tried to put into spark docker container (hadoop 2.6, spark
1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of
success...

No error on the console (the application started), I see something received
from kafka but the result is not written out to elasticsearch...

Where can I start the debug? I see in the spark console two job, both 0/1...

Thanks
--
Skype: boci13, Hangout: boci.b...@gmail.com


Fwd: SPARK STREAMING PROBLEM

2015-05-28 Thread Animesh Baranawal
I also started the streaming context by running ssc.start() but still apart
from logs nothing of g gets printed.

-- Forwarded message --
From: Animesh Baranawal 
Date: Thu, May 28, 2015 at 6:57 PM
Subject: SPARK STREAMING PROBLEM
To: user@spark.apache.org


Hi,

I am trying to extract the filenames from which a Dstream is generated by
parsing the toDebugString method on RDD
I am implementing the following code in spark-shell:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.textFileStream(// directory //)

def g : List[String] = {
   var res = List[String]()
   lines.foreachRDD{ rdd => {
  if(rdd.count > 0){
  val files = rdd.toDebugString.split("\n").filter(_.contains(":\"))
  files.foreach{ ms => {
 res = ms.split(" ")(2)::res
  }}   }
   }}
   res
}

g.foreach(x => {println(x); println("")})

However when I run the code, nothing gets printed on the console apart from
the logs. Am I doing something wrong?
And is there any better way to extract the file names from DStream ?

Thanks in advance


Animesh


Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
You must start the StreamingContext by calling ssc.start()

On Thu, May 28, 2015 at 6:57 PM, Animesh Baranawal <
animeshbarana...@gmail.com> wrote:

> Hi,
>
> I am trying to extract the filenames from which a Dstream is generated by
> parsing the toDebugString method on RDD
> I am implementing the following code in spark-shell:
>
> import org.apache.spark.streaming.{StreamingContext, Seconds}
> val ssc = new StreamingContext(sc,Seconds(10))
> val lines = ssc.textFileStream(// directory //)
>
> def g : List[String] = {
>var res = List[String]()
>lines.foreachRDD{ rdd => {
>   if(rdd.count > 0){
>   val files = rdd.toDebugString.split("\n").filter(_.contains(":\"))
>   files.foreach{ ms => {
>  res = ms.split(" ")(2)::res
>   }}   }
>}}
>res
> }
>
> g.foreach(x => {println(x); println("")})
>
> However when I run the code, nothing gets printed on the console apart
> from the logs. Am I doing something wrong?
> And is there any better way to extract the file names from DStream ?
>
> Thanks in advance
>
>
> Animesh
>
>


-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


SPARK STREAMING PROBLEM

2015-05-28 Thread Animesh Baranawal
Hi,

I am trying to extract the filenames from which a Dstream is generated by
parsing the toDebugString method on RDD
I am implementing the following code in spark-shell:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.textFileStream(// directory //)

def g : List[String] = {
   var res = List[String]()
   lines.foreachRDD{ rdd => {
  if(rdd.count > 0){
  val files = rdd.toDebugString.split("\n").filter(_.contains(":\"))
  files.foreach{ ms => {
 res = ms.split(" ")(2)::res
  }}   }
   }}
   res
}

g.foreach(x => {println(x); println("")})

However when I run the code, nothing gets printed on the console apart from
the logs. Am I doing something wrong?
And is there any better way to extract the file names from DStream ?

Thanks in advance


Animesh


FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: 

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary


Sent from Samsung Mobile

 Original message From: Dmitry Goldenberg 
 Date:2015/05/28  13:16  (GMT+00:00) 
To: Evo Eftimov  Cc: Gerard Maas 
,spark users  Subject: 
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or 
Spark's metrics? 
Thanks, Evo.  Per the last part of your comment, it sounds like we will 
need to implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the rec

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Evo.  Per the last part of your comment, it sounds like we will
need to implement a job manager which will be in control of starting the
jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
marking them as ones to relaunch, scaling the cluster up/down by
adding/removing machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping
track of the starter offsets in Kafka topic(s) from which the jobs started
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
of jobs, coupled with the wait for the new machines to come online may turn
out quite time-consuming which will make for lengthy request times, and our
requests are not asynchronous.  Ideally, the currently running jobs would
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job
scheduler not to send work to the node being taken out, find out when the
last job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine
while the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
> online learning – BUT this is STILL about voluntarily sacrificing your
> performance in the name of keeping your system stable – it is not about
> scaling your system/solution
>
>
>
> In terms of how to scale the Spark Framework Dynamically – even though
> this is not supported at the moment out of the box I guess you can have a
> sys management framework spin dynamically a few more boxes (spark worker
> nodes), stop dynamically your currently running Spark Streaming Job,
> relaunch it with new params e.g. more Receivers, larger number of
> Partitions (hence tasks), more RAM per executor etc. Obviously this will
> cause some temporary delay in fact interruption in your processing but if
> the business use case can tolerate that then go for it
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Thursday, May 28, 2015 12:36 PM
> *To:* dgoldenberg
> *Cc:* spark users
> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
> growth in Kafka or Spark's metrics?
>
>
>
> Hi,
>
>
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
>
>
>
> *Longer version.*
>
>
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
>
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
>
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
>
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
>
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
>
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
>
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
>
>
> :/metrics/json
>
>
>
> -kr, Gera

RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the receiver throughput.

 

There's work in progress to add a reactive model to the receiver, where 
backpressure can be applied to handle overload conditions. See 
https://issues.apache.org/jira/browse/SPARK-7398

 

Once the data is received, it will be processed in a 'classical' Spark 
pipeline, so previous posts on spark resource scheduling might apply.

 

Regarding metrics, the standard metrics subsystem of spark will report 
streaming job performance. Check the driver's metrics endpoint to peruse the 
available metrics:

 

:/metrics/json

 

-kr, Gerard.

 

 

(*) Spark is a project that moves so fast that statements might be invalidated 
by new work every minute.

 

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg  wrote:

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thank you, Gerard.

We're looking at the receiver-less setup with Kafka Spark streaming so I'm
not sure how to apply your comments to that case (not that we have to use
receiver-less but it seems to offer some advantages over the
receiver-based).

As far as "the number of Kafka receivers is fixed for the lifetime of your
DStream" -- this may be OK to start with. What I'm researching is the
ability to add worker nodes to the Spark cluster when needed and remove
them when no longer needed.  Do I understand correctly that a single
receiver may cause work to be farmed out to multiple 'slave'
machines/worker nodes?  If that's the case, we're less concerned with
multiple receivers; we're concerned with the worker node cluster itself.

If we use the ConsumerOffsetChecker class in Kafka that Rajesh mentioned
and instrument dynamic adding/removal of machines, my subsequent questions
then are, a) will Spark sense the addition of a new node / is it sufficient
that the cluster manager is aware, then work just starts flowing there?
 and  b) what would be a way to gracefully remove a worker node when the
load subsides, so that no currently running Spark job is killed?

- Dmitry

On Thu, May 28, 2015 at 7:36 AM, Gerard Maas  wrote:

> Hi,
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
> *Longer version.*
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
> :/metrics/json
>
> -kr, Gerard.
>
>
> (*) Spark is a project that moves so fast that statements might be
> invalidated by new work every minute.
>
> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark
>> (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing.  What are some of the ways to
>> generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with
>> the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has
>> mentioned Mesos...
>>
>> I see some info on Spark metrics in  the Spark monitoring guide
>>   .  Do we want to
>> perhaps implement a custom sink that would help us autoscale up or down
>> based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Gerard Maas
Hi,

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.


*Longer version.*

I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.

Then you have two things to consider: the Streaming receivers and the Spark
processing cluster.

Currently, the receiving topology is static. One receiver is allocated with
each DStream instantiated and it will use 1 core in the cluster. Once the
StreamingContext is started, this topology cannot be changed, therefore the
number of Kafka receivers is fixed for the lifetime of your DStream.
What we do is to calculate the cluster capacity and use that as a fixed
upper bound (with a margin) for the receiver throughput.

There's work in progress to add a reactive model to the receiver, where
backpressure can be applied to handle overload conditions. See
https://issues.apache.org/jira/browse/SPARK-7398

Once the data is received, it will be processed in a 'classical' Spark
pipeline, so previous posts on spark resource scheduling might apply.

Regarding metrics, the standard metrics subsystem of spark will report
streaming job performance. Check the driver's metrics endpoint to peruse
the available metrics:

:/metrics/json

-kr, Gerard.


(*) Spark is a project that moves so fast that statements might be
invalidated by new work every minute.

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
wrote:

> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling Spark
> (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing.  What are some of the ways to
> generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with
> the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone has
> mentioned Mesos...
>
> I see some info on Spark metrics in  the Spark monitoring guide
>   .  Do we want to
> perhaps implement a custom sink that would help us autoscale up or down
> based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Can you replace your counting part with this?

logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))



Thanks
Best Regards

On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG  wrote:

> Hi,
>
> I wrote a simple test job, it only does very basic operations. for example:
>
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
> 1)).map(_._2)
> val logs = lines.flatMap { line =>
>   try {
> Some(parse(line).extract[Impression])
>   } catch {
> case _: Exception => None
>   }
> }
>
> logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>   rdd.foreachPartition { iter =>
> iter.foreach(count => logger.info(count.toString))
>   }
> }
>
> It receives messages from Kafka, parse the json, filter and count the
> records, and then print it to logs.
>
> Thanks.
>
>
> On Thu, May 28, 2015 at 3:07 PM, Akhil Das 
> wrote:
>
>> Hi Zhang,
>>
>> Could you paste your code in a gist? Not sure what you are doing inside
>> the code to fill up memory.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG  wrote:
>>
>>> Hi,
>>>
>>> Yes, I'm using createStream, but the storageLevel param is by default
>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>>> don't think Kafka messages will be cached in driver.
>>>
>>>
>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
>>> wrote:
>>>
 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:

> Hi Akhil,
>
> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
> Processing Time is around 400ms, and there's no Scheduling Delay, so I
> suppose it's not the Kafka messages that eat up the off-heap memory. Or
> maybe it is, but how to tell?
>
> I googled about how to check the off-heap memory usage, there's a tool
> called pmap, but I don't know how to interprete the results.
>
> On Wed, May 27, 2015 at 3:08 PM, Akhil Das  > wrote:
>
>> After submitting the job, if you do a ps aux | grep spark-submit then
>> you can see all JVM params. Are you using the highlevel consumer 
>> (receiver
>> based) for receiving data from Kafka? In that case if your throughput is
>> high and the processing delay exceeds batch interval then you will hit 
>> this
>> memory issues as the data will keep on receiving and is dumped to memory.
>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
>> Another alternate will be to use the lowlevel kafka consumer
>>  or to use the
>> non-receiver based directStream
>> 
>> that comes up with spark.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>>> find out that YARN is killing the driver and executor process because of
>>> excessive use of memory. Here's something I tried:
>>>
>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
>>> the extra memory is not used by heap.
>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but
>>> the memory just keeps growing and then hits the limit.
>>> 3. This problem is not shown in low-throughput jobs, neither in
>>> standalone mode.
>>> 4. The test job just receives messages from Kafka, with batch
>>> interval of 1, do some filtering and aggregation, and then print to
>>> executor logs. So it's not some 3rd party library that causes the 
>>> 'leak'.
>>>
>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>
>>> Any ideas will be appreciated.
>>>
>>> Thanks.
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>


>>>
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>


Re: why does "com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException" happen?

2015-05-28 Thread Akhil Das
Can you paste the piece of code at least? Not sure, but it seems you are
reading/writing an object at the same time. You can try disabling kryo and
that might give you a proper exception stack.

Thanks
Best Regards

On Thu, May 28, 2015 at 2:45 PM, randylu  wrote:

> begs for your help
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/why-does-com-esotericsoftware-kryo-KryoException-java-u-til-ConcurrentModificationException-happen-tp23067p23068.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: FetchFailedException and MetadataFetchFailedException

2015-05-28 Thread Rok Roskar
yes I've had errors with too many open files before, but this doesn't seem
to be the case here.

Hmm, you're right in that these errors are different from what I initially
stated -- I think what I assumed was that the failure to write resulted in
the worker to crash which in turn resulted in a failed fetch. I'll try to
see if I can make sense of it from the logs.

On Fri, May 22, 2015 at 9:29 PM, Imran Rashid  wrote:

> hmm, sorry I think that disproves my theory.  Nothing else is immediately
> coming to mind.  Its possible there is more info in the logs from the
> driver, couldn't hurt to send those (though I don't have high hopes of
> finding anything that way).  Offchance this could be from too many open
> files or something?  Normally there is a different error msg, but I figure
> its worth asking anyway.
>
> The error you reported here was slightly different from your original
> post.  This error is from writing the shuffle map output, while the
> original error you reported was a fetch failed, which is from reading the
> shuffle data on the "reduce" side in the next stage.  Does the map stage
> actually finish, even though the tasks are throwing these errors while
> writing the map output?  Or do you sometimes get failures on the shuffle
> write side, and sometimes on the shuffle read side?  (Not that I think you
> are doing anything wrong, but it may help narrow down the root cause and
> possibly file a bug.)
>
> thanks
>
>
> On Fri, May 22, 2015 at 4:40 AM, Rok Roskar  wrote:
>
>> on the worker/container that fails, the "file not found" is the first
>> error -- the output below is from the yarn log. There were some python
>> worker crashes for another job/stage earlier (see the warning at 18:36) but
>> I expect those to be unrelated to this file not found error.
>>
>>
>> ==
>> LogType:stderr
>> Log Upload Time:15-May-2015 18:50:05
>> LogLength:5706
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
>> j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
>> to kill Python Worker
>> 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
>> (TID 995)
>> java.io.FileNotFoundException:
>> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
>> -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or
>> directory)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:212)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>> org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>> at
>> java

Re: why does "com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException" happen?

2015-05-28 Thread randylu
begs for your help 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-com-esotericsoftware-kryo-KryoException-java-u-til-ConcurrentModificationException-happen-tp23067p23068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



why does "com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException" happen?

2015-05-28 Thread randylu
  My program runs for 500 iterations, but fails at about 150 iterations
almostly. 
  It's hard to explain the details of my program, but i think my program is
ok, for it runs succesfully somtimes. *I just wana know in which situations
this exception will happen*.

  The detail error information is as follows:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 32 in stage 43644.0 failed 4 time
s, most recent failure: Lost task 32.3 in stage 43644.0 (TID 72976,
10.194.117.197): com.esotericsoftware.kryo.KryoException: java.u
til.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (org.apache.spark.executor.ExecutorURLClassLoader)
classLoader (com.twitter.chill.KryoBase)
kryo (org.apache.spark.serializer.KryoDeserializationStream)
$outer (org.apache.spark.serializer.DeserializationStream$$anon$1)
delegate (org.apache.spark.InterruptibleIterator)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

...

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:130)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:370)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 44 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler
.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.c

Re: Get all servers in security group in bash(ec2)

2015-05-28 Thread Akhil Das
You can use python boto library for that, in fact spark-ec2 script uses it
underneath. Here's the
 call
spark-ec2 is making to get all machines under a given security group.

Thanks
Best Regards

On Thu, May 28, 2015 at 2:22 PM, nizang  wrote:

> hi,
>
> Is there anyway in bash (from an ec2 apsrk server) to list all the servers
> in my security group (or better - in a given security group)
>
> I tried using:
>
> wget -q -O - http://instance-data/latest/meta-data/security-groups
> security_group_xxx
>
> but now, I want all the servers in security group security_group_xxx
>
> thanks, nizan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Get all servers in security group in bash(ec2)

2015-05-28 Thread nizang
hi,

Is there anyway in bash (from an ec2 apsrk server) to list all the servers
in my security group (or better - in a given security group)

I tried using:

wget -q -O - http://instance-data/latest/meta-data/security-groups
security_group_xxx

but now, I want all the servers in security group security_group_xxx

thanks, nizan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Akhil Das
1. Upto you, you can either add internal ip or the external ip, it won't be
a problem unless they are not in the same network.

2. If you only want to start a particular slave, then you can do like:

sbin/start-slave.sh  



Thanks
Best Regards

On Thu, May 28, 2015 at 1:52 PM, Nizan Grauer  wrote:

> hi,
>
> thanks for your answer!
>
> I have few more:
>
> 1) the file /root/spark/conf/slaves , has the full DNS names of servers (
> ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the
> internal ip?
> 2) You call to start-all. Isn't it too aggressive? Let's say I have 20
> slaves up, and I want to add one more, why should we stop the entire
> cluster for this?
>
> thanks, nizan
>
> On Thu, May 28, 2015 at 10:19 AM, Akhil Das 
> wrote:
>
>> I do this way:
>>
>> - Launch a new instance by clicking on the slave instance and choose *launch
>> more like this *
>> *- *Once its launched, ssh into it and add the master public key to
>> .ssh/authorized_keys
>> - Add the slaves internal IP to the master's conf/slaves file
>> - do sbin/start-all.sh and it will show up along with other slaves.
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Thu, May 28, 2015 at 12:29 PM, nizang  wrote:
>>
>>> hi,
>>>
>>> I'm working on spark standalone system on ec2, and I'm having problems on
>>> resizing the cluster (meaning - adding or removing slaves).
>>>
>>> In the basic ec2 scripts
>>> (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
>>> script
>>> for lunching the cluster, not adding slaves to it. On the
>>> spark-standalone
>>> page
>>> (
>>> http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
>>> ),
>>> I can see only options for stopping and starting slaves, not adding them.
>>>
>>> What I try to do now (as a bad workaround...), is the following:
>>>
>>> 1) Go to the ec2 UI, create image from the current slave
>>> 2) Lunch new instance based on this image
>>> 3) Copy the public DNS of this slave
>>> 4) SSH to the master, and edit the file
>>> "/root/spark-ec2/ec2-variables.sh",
>>> and add the DNS to the "export SLAVES" variables
>>> 5) Running the script /root/spark-ec2/setup.sh
>>>
>>> After doing the above steps, I can see the new slave in the UI (8080
>>> port)
>>> of the master. However, this solution is bad for many reasons:
>>>
>>> 1) It requires many manual steps
>>> 2) It requires stopping and starting the cluster
>>> 3) There's no auto-detection in case slave stopped
>>>
>>> and many other reasons...
>>>
>>> oes anybody have another idea on how to add/remove slaves for standalone
>>> on
>>> a simple and safe way?
>>>
>>> thanks, nizan
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Spark Cassandra

2015-05-28 Thread lucas
Hello, 

I am trying to save data from spark to Cassandra.

So I have an ScalaESRDD (because i take data from elasticsearch) that
contains a lot of key/values like this :

(AU16r4o_kbhIuSky3zFO , Map(@timestamp -> 2015-05-21T21:35:54.035Z, 
timestamp -> 2015-05-21 23:35:54,035, loglevel -> INFO, thread -> 
ajp-crmprod-fr-003%2F10.2.53.39-8009-178, ID_Echange -> 1432244153901,
SessionID -> -, ProcessID -> 1432244149a998000f024390, IP ->
37.163.216.219, proxy -> 37.163.216.219, ContactID -> 2255701356, Login ->
46711205, messageType -> <<))

(AU16r4o_kbhIuSky3zFT , Map(@timestamp -> 2015-05-21T21:35:55.450Z,
timestamp -> 2015-05-21 23:35:55,450, loglevel -> INFO, thread -> 
ajp-crmprod-fr-003%2F10.2.53.39-8009-168, ID_Echange -> 1432244155381,
SessionID -> 76c8ad0dc9c86840cf1ff078911a6040, ProcessID ->
1432244154a435000f024680, IP -> 82.240.167.71, proxy ->
82.240.167.71, ContactID -> 2816846047, Login -> 14642027, messageType ->
<<))

So there is a key and a map as value. 
What I want to do is to put each Key/Value on a cassandra table. I used
saveToCassandra() but it is not exactly what I wish because after this I
have a table with the key and the map but it is not possible to do queries
on a map in Cassandra (so it is useless). 
So I want to put data in order to be able to query the map.

I thought about create a user type but how can I save the map in a user type
for each Key/Value ? 

I hope it is clear, 

Thank you very much for you help.

Best Regards,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-tp23065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Nizan Grauer
hi,

thanks for your answer!

I have few more:

1) the file /root/spark/conf/slaves , has the full DNS names of servers (
ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the
internal ip?
2) You call to start-all. Isn't it too aggressive? Let's say I have 20
slaves up, and I want to add one more, why should we stop the entire
cluster for this?

thanks, nizan

On Thu, May 28, 2015 at 10:19 AM, Akhil Das 
wrote:

> I do this way:
>
> - Launch a new instance by clicking on the slave instance and choose *launch
> more like this *
> *- *Once its launched, ssh into it and add the master public key to
> .ssh/authorized_keys
> - Add the slaves internal IP to the master's conf/slaves file
> - do sbin/start-all.sh and it will show up along with other slaves.
>
>
>
> Thanks
> Best Regards
>
> On Thu, May 28, 2015 at 12:29 PM, nizang  wrote:
>
>> hi,
>>
>> I'm working on spark standalone system on ec2, and I'm having problems on
>> resizing the cluster (meaning - adding or removing slaves).
>>
>> In the basic ec2 scripts
>> (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
>> script
>> for lunching the cluster, not adding slaves to it. On the spark-standalone
>> page
>> (
>> http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
>> ),
>> I can see only options for stopping and starting slaves, not adding them.
>>
>> What I try to do now (as a bad workaround...), is the following:
>>
>> 1) Go to the ec2 UI, create image from the current slave
>> 2) Lunch new instance based on this image
>> 3) Copy the public DNS of this slave
>> 4) SSH to the master, and edit the file
>> "/root/spark-ec2/ec2-variables.sh",
>> and add the DNS to the "export SLAVES" variables
>> 5) Running the script /root/spark-ec2/setup.sh
>>
>> After doing the above steps, I can see the new slave in the UI (8080 port)
>> of the master. However, this solution is bad for many reasons:
>>
>> 1) It requires many manual steps
>> 2) It requires stopping and starting the cluster
>> 3) There's no auto-detection in case slave stopped
>>
>> and many other reasons...
>>
>> oes anybody have another idea on how to add/remove slaves for standalone
>> on
>> a simple and safe way?
>>
>> thanks, nizan
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


DataFrame nested sctructure selection limit

2015-05-28 Thread Eugene Morozov
Hi!

I have a json file with some data, I’m able to create DataFrame out of it and 
the schema for particular part of it I’m interested in looks like following:

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root
 |-- attributes: struct (nullable = true)
 ||-- Address2: array (nullable = true)
 |||-- value: struct (nullable = true)
 ||||-- Zip: array (nullable = true)
 |||||-- element: struct (containsNull = true)
 ||||||-- value: struct (nullable = true)
 |||||||-- Zip5: array (nullable = true)
 ||||||||-- element: struct (containsNull = true)
 |||||||||-- value: string (nullable = true)


When I’m trying to just select the deepest field:
json.select("attributes.Address2.value.Zip.value.Zip5").collect()

It gives me a really weird exception. So I wonder if there is a particular 
limit of how deeply I’m able to select fields. What should I do to select the 
piece I need?

Here is the exception.
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type 
ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
at 
org.apache.spark.sql.catalyst.analysis.Analy

Re: Recommended Scala version

2015-05-28 Thread Tathagata Das
Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out
soon) with Scala 2.11 and report issues.

TD

On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers  wrote:

> we are still running into issues with spark-shell not working on 2.11, but
> we are running on somewhat older master so maybe that has been resolved
> already.
>
> On Tue, May 26, 2015 at 11:48 AM, Dean Wampler 
> wrote:
>
>> Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the
>> Spark project has published maven artifacts that are compiled with 2.11 and
>> 2.10, although the downloads at http://spark.apache.org/downloads.html
>> are still all for 2.10.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh <
>> riteshoneinamill...@gmail.com> wrote:
>>
>>> Yes, recommended version is 2.10 as all the features are not supported
>>> by 2.11 version. Kafka libraries and JDBC components are yet to be ported
>>> to 2.11 version. And so if your project doesn't depend on these components,
>>> you can give v2.11 a try.
>>>
>>> Here's a link
>>> 
>>>  for
>>> building with 2.11 version.
>>>
>>> Though, you won't be running into any issues if you try v2.10 as of now.
>>> But then again, the future releases will have to shift to 2.11 version once
>>> support for v2.10 ends in the long run.
>>>
>>>
>>> On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal <
>>> punya.bis...@gmail.com> wrote:
>>>
 Dear Spark developers and users,

 Am I correct in believing that the recommended version of Scala to use
 with Spark is currently 2.10? Is there any plan to switch to 2.11 in
 future? Are there any advantages to using 2.11 today?

 Regards,
 Punya
>>>
>>>
>>>
>>
>


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

I wrote a simple test job, it only does very basic operations. for example:

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
1)).map(_._2)
val logs = lines.flatMap { line =>
  try {
Some(parse(line).extract[Impression])
  } catch {
case _: Exception => None
  }
}

logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
  rdd.foreachPartition { iter =>
iter.foreach(count => logger.info(count.toString))
  }
}

It receives messages from Kafka, parse the json, filter and count the
records, and then print it to logs.

Thanks.


On Thu, May 28, 2015 at 3:07 PM, Akhil Das 
wrote:

> Hi Zhang,
>
> Could you paste your code in a gist? Not sure what you are doing inside
> the code to fill up memory.
>
> Thanks
> Best Regards
>
> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG  wrote:
>
>> Hi,
>>
>> Yes, I'm using createStream, but the storageLevel param is by default
>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>> don't think Kafka messages will be cached in driver.
>>
>>
>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
>> wrote:
>>
>>> Are you using the createStream or createDirectStream api? If its the
>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
>>> slow things down though). Another way would be to try the later one.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:
>>>
 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
 wrote:

> After submitting the job, if you do a ps aux | grep spark-submit then
> you can see all JVM params. Are you using the highlevel consumer (receiver
> based) for receiving data from Kafka? In that case if your throughput is
> high and the processing delay exceeds batch interval then you will hit 
> this
> memory issues as the data will keep on receiving and is dumped to memory.
> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
> Another alternate will be to use the lowlevel kafka consumer
>  or to use the
> non-receiver based directStream
> 
> that comes up with spark.
>
> Thanks
> Best Regards
>
> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG 
> wrote:
>
>> Hi,
>>
>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>> find out that YARN is killing the driver and executor process because of
>> excessive use of memory. Here's something I tried:
>>
>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
>> extra memory is not used by heap.
>> 2. I set the two memoryOverhead params to 1024 (default is 384), but
>> the memory just keeps growing and then hits the limit.
>> 3. This problem is not shown in low-throughput jobs, neither in
>> standalone mode.
>> 4. The test job just receives messages from Kafka, with batch
>> interval of 1, do some filtering and aggregation, and then print to
>> executor logs. So it's not some 3rd party library that causes the 'leak'.
>>
>> Spark 1.3 is built by myself, with correct hadoop versions.
>>
>> Any ideas will be appreciated.
>>
>> Thanks.
>>
>> --
>> Jerry
>>
>
>


 --
 Jerry

>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry


Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Akhil Das
I do this way:

- Launch a new instance by clicking on the slave instance and choose *launch
more like this *
*- *Once its launched, ssh into it and add the master public key to
.ssh/authorized_keys
- Add the slaves internal IP to the master's conf/slaves file
- do sbin/start-all.sh and it will show up along with other slaves.



Thanks
Best Regards

On Thu, May 28, 2015 at 12:29 PM, nizang  wrote:

> hi,
>
> I'm working on spark standalone system on ec2, and I'm having problems on
> resizing the cluster (meaning - adding or removing slaves).
>
> In the basic ec2 scripts
> (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
> script
> for lunching the cluster, not adding slaves to it. On the spark-standalone
> page
> (
> http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
> ),
> I can see only options for stopping and starting slaves, not adding them.
>
> What I try to do now (as a bad workaround...), is the following:
>
> 1) Go to the ec2 UI, create image from the current slave
> 2) Lunch new instance based on this image
> 3) Copy the public DNS of this slave
> 4) SSH to the master, and edit the file "/root/spark-ec2/ec2-variables.sh",
> and add the DNS to the "export SLAVES" variables
> 5) Running the script /root/spark-ec2/setup.sh
>
> After doing the above steps, I can see the new slave in the UI (8080 port)
> of the master. However, this solution is bad for many reasons:
>
> 1) It requires many manual steps
> 2) It requires stopping and starting the cluster
> 3) There's no auto-detection in case slave stopped
>
> and many other reasons...
>
> oes anybody have another idea on how to add/remove slaves for standalone on
> a simple and safe way?
>
> thanks, nizan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Somnath Pandeya
Try scala eclipse plugin to eclipsify spark project and import spark as eclipse 
project

-Somnath

-Original Message-
From: Nan Xiao [mailto:xiaonan830...@gmail.com] 
Sent: Thursday, May 28, 2015 12:32 PM
To: user@spark.apache.org
Subject: How to use Eclipse on Windows to build Spark environment?

Hi all,

I want to use Eclipse on Windows to build Spark environment, but find the 
reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on Windows to 
build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


 CAUTION - Disclaimer *
This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely 
for the use of the addressee(s). If you are not the intended recipient, please 
notify the sender by e-mail and delete the original message. Further, you are 
not 
to copy, disclose, or distribute this e-mail or its contents to any other 
person and 
any such actions are unlawful. This e-mail may contain viruses. Infosys has 
taken 
every reasonable precaution to minimize this risk, but is not liable for any 
damage 
you may sustain as a result of any virus in this e-mail. You should carry out 
your 
own virus checks before opening the e-mail or attachment. Infosys reserves the 
right to monitor and review the content of all messages sent to or from this 
e-mail 
address. Messages sent to or from this e-mail address may be stored on the 
Infosys e-mail system.
***INFOSYS End of Disclaimer INFOSYS***
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Hi Zhang,

Could you paste your code in a gist? Not sure what you are doing inside the
code to fill up memory.

Thanks
Best Regards

On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG  wrote:

> Hi,
>
> Yes, I'm using createStream, but the storageLevel param is by default
> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
> don't think Kafka messages will be cached in driver.
>
>
> On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
> wrote:
>
>> Are you using the createStream or createDirectStream api? If its the
>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
>> slow things down though). Another way would be to try the later one.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG  wrote:
>>
>>> Hi Akhil,
>>>
>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>>> maybe it is, but how to tell?
>>>
>>> I googled about how to check the off-heap memory usage, there's a tool
>>> called pmap, but I don't know how to interprete the results.
>>>
>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
>>> wrote:
>>>
 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
  or to use the
 non-receiver based directStream
 
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG  wrote:

> Hi,
>
> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
> out that YARN is killing the driver and executor process because of
> excessive use of memory. Here's something I tried:
>
> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
> extra memory is not used by heap.
> 2. I set the two memoryOverhead params to 1024 (default is 384), but
> the memory just keeps growing and then hits the limit.
> 3. This problem is not shown in low-throughput jobs, neither in
> standalone mode.
> 4. The test job just receives messages from Kafka, with batch interval
> of 1, do some filtering and aggregation, and then print to executor logs.
> So it's not some 3rd party library that causes the 'leak'.
>
> Spark 1.3 is built by myself, with correct hadoop versions.
>
> Any ideas will be appreciated.
>
> Thanks.
>
> --
> Jerry
>


>>>
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>


How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Nan Xiao
Hi all,

I want to use Eclipse on Windows to build Spark environment, but find
the reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on
Windows to build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Adding slaves on spark standalone on ec2

2015-05-28 Thread nizang
hi,

I'm working on spark standalone system on ec2, and I'm having problems on
resizing the cluster (meaning - adding or removing slaves).

In the basic ec2 scripts
(http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script
for lunching the cluster, not adding slaves to it. On the spark-standalone
page
(http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts),
I can see only options for stopping and starting slaves, not adding them.

What I try to do now (as a bad workaround...), is the following:

1) Go to the ec2 UI, create image from the current slave
2) Lunch new instance based on this image
3) Copy the public DNS of this slave
4) SSH to the master, and edit the file "/root/spark-ec2/ec2-variables.sh",
and add the DNS to the "export SLAVES" variables
5) Running the script /root/spark-ec2/setup.sh

After doing the above steps, I can see the new slave in the UI (8080 port)
of the master. However, this solution is bad for many reasons:

1) It requires many manual steps
2) It requires stopping and starting the cluster
3) There's no auto-detection in case slave stopped

and many other reasons...

oes anybody have another idea on how to add/remove slaves for standalone on
a simple and safe way?

thanks, nizan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org