Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread ayan guha
Probably a naive question: can you try the same in hive CLI and see if your
SQL is working? Looks like hive thing to me as spark is faithfully
delegating the query to hive.
On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote:

 Hi ,
 I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
 compatibility so that I can run Spark sql and access temp table remotely.

 I used below command to build  Spark, it was build successful but when I
 tried to access Hive data from Spark sql, I get error.

 Thanks,
 Abhi

 ---
 *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
 -Phive-thriftserver -DskipTests clean package*

 [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
 [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
 SET spark.sql.hive.version=0.13.1
 spark-sql show tables;
 sample_07 false
 t1 false
 Time taken: 3.901 seconds, Fetched 2 row(s)
 spark-sql select * from t1;
 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
 java.lang.VerifyError: class
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
 overrides final method *getUnknownFields*
 .()Lcom/google/protobuf/UnknownFieldSet;
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)






spark mlib variance analysis

2015-05-28 Thread rafac
I have a simple problem: 
i got mean number of people on one place by hour(time-series like), and now
i want to know if the weather condition have impact on the mean number.
I would do it with  variance analysis like anova in spss or analysing the
resultant regression model summary
How is it possible to do with apache spark mlib? 

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-mlib-variance-analysis-tp23079.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Batch aggregation by sliding window + join

2015-05-28 Thread ayan guha
Which version of spark? In 1.4 window queries will show up for these kind
of scenarios.

1 thing I can suggest is keep daily aggregates materialised and partioned
by key and sorted by key-day combination using repartitionandsort method.
It allows you to use custom partitioner and custom sorter.

Best
Ayan
On 29 May 2015 03:31, igor.berman igor.ber...@gmail.com wrote:

 Hi,
 I have a batch daily job that computes daily aggregate of several counters
 represented by some object.
 After daily aggregation is done, I want to compute block of 3 days
 aggregation(3,7,30 etc)
 To do so I need to add new daily aggregation to the current block and then
 subtract from current block the daily aggregation of the last day within
 the
 current block(sliding window...)
 I've implemented it with something like:

 baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
 All rdds are keyed by unique id(long). Each rdd is saved in avro files
 after
 the job finishes and loaded when job starts(on next day). baseBlockRdd is
 much larger than lastDay and newDay rdds(depends on the size of the block)

 Unfortunately the performance is not satisfactory due to many shuffles(I
 have parallelism etc) I was looking for the way to improve performance
 somehow, to make sure that one task joins same local keys without
 reshuffling baseBlockRdd(which is big) each time the job starts(see
 https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
 so bottom line - how to join big rdd with smaller rdd without reshuffling
 big rdd over and over again?
 As soon as I've saved this big rdd and reloaded it from disk I want that
 every other rdd will be partitioned and collocated by the same
 partitioner(which is absent for hadooprdd) ... somehow, so that only
 small
 rdds will be sent over network.

 Another idea I had  - somehow split baseBlock into 2 parts with filter by
 keys of small rdds and then join, however I'm not sure it's possible to
 implement this filter without join.

 any ideas would be appreciated,
 thanks in advance
 Igor



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Which would imply that if there was a load manager type of service, it
could signal to the driver(s) that they need to acquiesce, i.e. process
what's at hand and terminate.  Then bring up a new machine, then restart
the driver(s)...  Same deal with removing machines from the cluster. Send a
signal for the drivers to pipe down and terminate, then restart them.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote:

 I'm not sure that points 1 and 2 really apply to the kafka direct stream.
 There are no receivers, and you know at the driver how big each of your
 batches is.

 On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

 Hi all,

 As the author of the dynamic allocation feature I can offer a few
 insights here.

 Gerard's explanation was both correct and concise: dynamic allocation is
 not intended to be used in Spark streaming at the moment (1.4 or before).
 This is because of two things:

 (1) Number of receivers is necessarily fixed, and these are started in
 executors. Since we need a receiver for each InputDStream, if we kill these
 receivers we essentially stop the stream, which is not what we want. It
 makes little sense to close and restart a stream the same way we kill and
 relaunch executors.

 (2) Records come in every batch, and when there is data to process your
 executors are not idle. If your idle timeout is less than the batch
 duration, then you'll end up having to constantly kill and restart
 executors. If your idle timeout is greater than the batch duration, then
 you'll never kill executors.

 Long answer short, with Spark streaming there is currently no
 straightforward way to scale the size of your cluster. I had a long
 discussion with TD (Spark streaming lead) about what needs to be done to
 provide some semblance of dynamic scaling to streaming applications, e.g.
 take into account the batch queue instead. We came up with a few ideas that
 I will not detail here, but we are looking into this and do intend to
 support it in the near future.

 -Andrew



 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com:

 Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
 – it will be your insurance policy against sys crashes due to memory leaks.
 Until there is free RAM, spark streaming (spark) will NOT resort to disk –
 and of course resorting to disk from time to time (ie when there is no free
 RAM ) and taking a performance hit from that, BUT only until there is no
 free RAM



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Thursday, May 28, 2015 2:34 PM
 *To:* Evo Eftimov
 *Cc:* Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo, good points.



 On the dynamic resource allocation, I'm surmising this only works within
 a particular cluster setup.  So it improves the usage of current cluster
 resources but it doesn't make the cluster itself elastic. At least, that's
 my understanding.



 Memory + disk would be good and hopefully it'd take *huge* load on the
 system to start exhausting the disk space too.  I'd guess that falling onto
 disk will make things significantly slower due to the extra I/O.



 Perhaps we'll really want all of these elements eventually.  I think
 we'd want to start with memory only, keeping maxRate low enough not to
 overwhelm the consumers; implement the cluster autoscaling.  We might
 experiment with dynamic resource allocation before we get to implement the
 cluster autoscale.







 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients
 which keep them in zoomeeper dont worry about that ad long as you shut down
 your job gracefuly. Besides msnaging the offsets explicitly is not a big
 deal if necessary





 Sent from Samsung Mobile



  Original message 

 From: 

Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
Thanks Sandy- I was digging through the code in the deploy.yarn.Client and
literally found that property right before I saw your reply. I'm on 1.2.x
right now which doesn't have the property. I guess I need to update sooner
rather than later.

On Thu, May 28, 2015 at 3:56 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Corey,

 As of this PR https://github.com/apache/spark/pull/5297/files, this can
 be controlled with spark.yarn.submit.waitAppCompletion.

 -Sandy

 On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote:

 I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
 noticing the jvm that fires up to allocate the resources, etc... is not
 going away after the application master and executors have been allocated.
 Instead, it just sits there printing 1 second status updates to the
 console. If I kill it, my job still runs (as expected).

 Is there an intended way to stop this from happening and just have the
 local JVM die when it's done allocating the resources and deploying the
 application master?





UDF accessing hive struct array fails with buffer underflow from kryo

2015-05-28 Thread yluo
Hi all, I'm using Spark 1.3.1 with Hive 0.13.1. When running a UDF accessing
a hive struct array the query fails with:

Caused by: com.esotericsoftware.kryo.KryoException: Buffer underflow.
Serialization trace:
fieldName
(org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector$MyField)
fields
(org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector)
listElementObjectInspector
(org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector)
argStructArrayOI (com.groupon.hive.udf.filter.StructStringMemberFilterUDF)
at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
at com.esotericsoftware.kryo.io.Input.readAscii_slow(Input.java:580)
at com.esotericsoftware.kryo.io.Input.readAscii(Input.java:558)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:436)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
at
org.apache.hadoop.hive.ql.exec.Utilities.deserializeObjectByKryo(Utilities.java:918)
... 102 more

Anyone seen anything similar? argStructArrayOI is a Hive
ListObjectInspector. The field the argStructArrayOI is accessing looks like:

arraystructlt;order_by_id:bigint,subscription_id:bigint,unsubscribe_hash:string,country_id:int,optin_hash:string,city_part_id:bigint,subscription_type:string,locale:string

The table is a hive table.

Running the same query on Hive works... what's going on here? Any
suggestions on how to debug this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UDF-accessing-hive-struct-array-fails-with-buffer-underflow-from-kryo-tp23078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark java.io.FileNotFoundException: /user/spark/applicationHistory/application

2015-05-28 Thread roy
hi,

 Suddenly spark jobs started failing with following error

Exception in thread main java.io.FileNotFoundException:
/user/spark/applicationHistory/application_1432824195832_1275.inprogress (No
such file or directory)

full trace here

[21:50:04 x...@hadoop-client01.dev:~]$ spark-submit --class
org.apache.spark.examples.SparkPi --master yarn
/usr/lib/spark/lib/spark-examples.jar 10
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/05/28 21:55:21 INFO SparkContext: Running Spark version 1.3.0
15/05/28 21:55:21 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/05/28 21:55:21 WARN SparkConf:
SPARK_JAVA_OPTS was detected (set to ' -Dspark.local.dir=/srv/tmp/xyz ').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with conf/spark-defaults.conf to set defaults for an
application
 - ./spark-submit with --driver-java-options to set -X options for a driver
 - spark.executor.extraJavaOptions to set -X options for executors
 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master
or worker)

15/05/28 21:55:21 WARN SparkConf: Setting 'spark.executor.extraJavaOptions'
to ' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around.
15/05/28 21:55:21 WARN SparkConf: Setting 'spark.driver.extraJavaOptions' to
' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around.
15/05/28 21:55:22 INFO SecurityManager: Changing view acls to: xyz
15/05/28 21:55:22 INFO SecurityManager: Changing modify acls to: xyz
15/05/28 21:55:22 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(xyz); users
with modify permissions: Set(xyz)
15/05/28 21:55:22 INFO Slf4jLogger: Slf4jLogger started
15/05/28 21:55:22 INFO Remoting: Starting remoting
15/05/28 21:55:22 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkdri...@hadoop-client01.abc.com:51876]
15/05/28 21:55:22 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkdri...@hadoop-client01.abc.com:51876]
15/05/28 21:55:22 INFO Utils: Successfully started service 'sparkDriver' on
port 51876.
15/05/28 21:55:22 INFO SparkEnv: Registering MapOutputTracker
15/05/28 21:55:22 INFO SparkEnv: Registering BlockManagerMaster
15/05/28 21:55:22 INFO DiskBlockManager: Created local directory at
/srv/tmp/xyz/spark-1e66e6eb-7ad6-4f62-87fc-f0cfaa631e36/blockmgr-61f866b8-6475-4a11-88b2-792d2ba22662
15/05/28 21:55:22 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
15/05/28 21:55:23 INFO HttpFileServer: HTTP File server directory is
/srv/tmp/xyz/spark-2b676170-3f88-44bf-87a3-600de1b7ee24/httpd-b84f76d5-26c7-4c63-9223-f6c5aa3899f0
15/05/28 21:55:23 INFO HttpServer: Starting HTTP Server
15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/28 21:55:23 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:41538
15/05/28 21:55:23 INFO Utils: Successfully started service 'HTTP file
server' on port 41538.
15/05/28 21:55:23 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/28 21:55:23 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/05/28 21:55:23 INFO Utils: Successfully started service 'SparkUI' on port
4040.
15/05/28 21:55:23 INFO SparkUI: Started SparkUI at
http://hadoop-client01.abc.com:4040
15/05/28 21:55:23 INFO SparkContext: Added JAR
file:/usr/lib/spark/lib/spark-examples.jar at
http://10.0.3.62:41538/jars/spark-examples.jar with timestamp 1432850123523
15/05/28 21:55:24 INFO Client: Requesting a new application from cluster
with 16 NodeManagers
15/05/28 21:55:24 INFO Client: Verifying our application has not requested
more than the maximum memory capability of the cluster (61440 MB per
container)
15/05/28 21:55:24 INFO Client: Will allocate AM container, with 896 MB
memory including 384 MB overhead
15/05/28 21:55:24 INFO Client: Setting up container launch context for our
AM
15/05/28 21:55:24 INFO Client: Preparing resources for our AM container
15/05/28 21:55:24 INFO Client: Setting up the launch environment for our AM
container
15/05/28 21:55:24 INFO SecurityManager: Changing view acls to: xyz
15/05/28 21:55:24 INFO SecurityManager: Changing modify acls to: xyz
15/05/28 21:55:24 INFO SecurityManager: SecurityManager: 

Adding an indexed column

2015-05-28 Thread Cesar Flores
Assuming that I have the next data frame:

flag | price
--
1|47.808764653746
1|47.808764653746
1|31.9869279512204
1|47.7907893713564
1|16.7599200038239
1|16.7599200038239
1|20.3916014172137

How can I create a data frame with an extra indexed column as the next one:

flag | price  | index
--|---
1|47.808764653746 | 0
1|47.808764653746 | 1
1|31.9869279512204| 2
1|47.7907893713564| 3
1|16.7599200038239| 4
1|16.7599200038239| 5
1|20.3916014172137| 6

-- 
Cesar Flores


Re: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Mulugeta Mammo
Thanks for the valuable information. The blog states:

The cores property controls the number of concurrent tasks an executor can
run. --executor-cores 5 means that each executor can run a maximum of five
tasks at the same time. 

So, I guess the max number of executor-cores I can assign is the CPU count
(which includes the number of threads per core), not just the number of
cores. I just want to be sure the cores term Spark is using.

Thanks

On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com
wrote:

 It's not only about cores. Keep in mind spark.executor.cores also affects
 available memeory for each task:

 From
 http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

 The memory available to each task is (spark.executor.memory *
 spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/
 spark.executor.cores. Memory fraction and safety fraction default to 0.2
 and 0.8 respectively.

 I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your
 job run faster..


 --
 Ruslan Dautkhanov

 On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
  wrote:

 My executor has the following spec (lscpu):

 CPU(s): 16
 Core(s) per socket: 4
 Socket(s): 2
 Thread(s) per code: 2

 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark
 expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores
 (2 * 2 = 4) ?

 Thanks





Fwd: [Streaming] Configure executor logging on Mesos

2015-05-28 Thread Tim Chen
-- Forwarded message --
From: Tim Chen t...@mesosphere.io
Date: Thu, May 28, 2015 at 10:49 AM
Subject: Re: [Streaming] Configure executor logging on Mesos
To: Gerard Maas gerard.m...@gmail.com


Hi Gerard,

The log line you referred to is not Spark logging but Mesos own logging,
which is using glog.

Our own executor logs should only contain very few lines though.

Most of the log lines you'll see is from Spark, and it can be controled by
specifiying a log4j.properties to be downloaded with your Mesos task.
Alternatively if you are downloading Spark executor via spark.executor.uri,
you can include log4j.properties in that tar ball.

I think we probably need some more configurations for Spark scheduler to
pick up extra files to be downloaded into the sandbox.

Tim





On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi,

 I'm trying to control the verbosity of the logs on the Mesos executors
 with no luck so far. The default behaviour is INFO on stderr dump with an
 unbounded growth that gets too big at some point.

 I noticed that when the executor is instantiated, it locates a default log
 configuration in the spark assembly:

 I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
 20150528-063307-780930314-5050-8152-S5
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties

 So, no matter what I provide in my job jar files (or also tried with
 (spark.executor.extraClassPath=log4j.properties) takes effect in the
 executor's configuration.

 How should I configure the log on the executors?

 thanks, Gerard.



Hyperthreading

2015-05-28 Thread Mulugeta Mammo
Hi guys,

Does the SPARK_EXECUTOR_CORES assume Hyper threading? For example, if I
have 4 cores with 2 threads per core, should the SPARK_EXECUTOR_CORES be
4*2 = 8 or just 4?

Thanks,


RE: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Evo Eftimov
I don’t think the number of CPU cores controls the “number of parallel tasks”. 
The number of Tasks corresponds first and foremost to the number of (Dstream) 
RDD Partitions  

 

The Spark documentation doesn’t mention what is meant by “Task” in terms of 
Standard Multithreading Terminology ie a Thread or Process so your point is 
good 

 

Ps: time and time again every product and dev team and company invent their own 
terminology so 50% of the time using the product is spent on deciphering and 
reinventing the wheel 

 

From: Mulugeta Mammo [mailto:mulugeta.abe...@gmail.com] 
Sent: Thursday, May 28, 2015 7:24 PM
To: Ruslan Dautkhanov
Cc: user
Subject: Re: Value for SPARK_EXECUTOR_CORES

 

Thanks for the valuable information. The blog states:

 

The cores property controls the number of concurrent tasks an executor can 
run. --executor-cores 5 means that each executor can run a maximum of five 
tasks at the same time.  

 

So, I guess the max number of executor-cores I can assign is the CPU count 
(which includes the number of threads per core), not just the number of cores. 
I just want to be sure the cores term Spark is using.

 

Thanks

 

On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com 
wrote:

It's not only about cores. Keep in mind spark.executor.cores also affects 
available memeory for each task:

 

From 
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

 

The memory available to each task is (spark.executor.memory * 
spark.shuffle.memoryFraction 
*spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety 
fraction default to 0.2 and 0.8 respectively.




I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run 
faster..

 


-- 
Ruslan Dautkhanov

 

On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com 
wrote:

My executor has the following spec (lscpu):

 

CPU(s): 16

Core(s) per socket: 4

Socket(s): 2

Thread(s) per code: 2

 

The CPU count is obviously 4*2*2 = 16. My question is what value is Spark 
expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 
2 = 4) ?

 

Thanks

 

 



yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
noticing the jvm that fires up to allocate the resources, etc... is not
going away after the application master and executors have been allocated.
Instead, it just sits there printing 1 second status updates to the
console. If I kill it, my job still runs (as expected).

Is there an intended way to stop this from happening and just have the
local JVM die when it's done allocating the resources and deploying the
application master?


Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Sandy Ryza
Hi Corey,

As of this PR https://github.com/apache/spark/pull/5297/files, this can be
controlled with spark.yarn.submit.waitAppCompletion.

-Sandy

On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote:

 I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
 noticing the jvm that fires up to allocate the resources, etc... is not
 going away after the application master and executors have been allocated.
 Instead, it just sits there printing 1 second status updates to the
 console. If I kill it, my job still runs (as expected).

 Is there an intended way to stop this from happening and just have the
 local JVM die when it's done allocating the resources and deploying the
 application master?



Re: PySpark with OpenCV causes python worker to crash

2015-05-28 Thread Davies Liu
Could you try to comment out some lines in
`extract_sift_features_opencv` to find which line cause the crash?

If the bytes came from sequenceFile() is broken, it's easy to crash a
C library in Python (OpenCV).

On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote:
 Hi sparkers,

 I am working on a PySpark application which uses the OpenCV library. It runs
 fine when running the code locally but when I try to run it on Spark on the
 same Machine it crashes the worker.

 The code can be found here:
 https://gist.github.com/samos123/885f9fe87c8fa5abf78f

 This is the error message taken from STDERR of the worker log:
 https://gist.github.com/samos123/3300191684aee7fc8013

 Would like pointers or tips on how to debug further? Would be nice to know
 the reason why the worker crashed.

 Thanks,
 Sam Stoelinga


 org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.EOFException
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Ruslan Dautkhanov
It's not only about cores. Keep in mind spark.executor.cores also affects
available memeory for each task:

From
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

The memory available to each task is (spark.executor.memory *
spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/
spark.executor.cores. Memory fraction and safety fraction default to 0.2
and 0.8 respectively.

I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job
run faster..


-- 
Ruslan Dautkhanov

On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com
wrote:

 My executor has the following spec (lscpu):

 CPU(s): 16
 Core(s) per socket: 4
 Socket(s): 2
 Thread(s) per code: 2

 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark
 expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores
 (2 * 2 = 4) ?

 Thanks



Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Andrew Or
Hi all,

As the author of the dynamic allocation feature I can offer a few insights
here.

Gerard's explanation was both correct and concise: dynamic allocation is
not intended to be used in Spark streaming at the moment (1.4 or before).
This is because of two things:

(1) Number of receivers is necessarily fixed, and these are started in
executors. Since we need a receiver for each InputDStream, if we kill these
receivers we essentially stop the stream, which is not what we want. It
makes little sense to close and restart a stream the same way we kill and
relaunch executors.

(2) Records come in every batch, and when there is data to process your
executors are not idle. If your idle timeout is less than the batch
duration, then you'll end up having to constantly kill and restart
executors. If your idle timeout is greater than the batch duration, then
you'll never kill executors.

Long answer short, with Spark streaming there is currently no
straightforward way to scale the size of your cluster. I had a long
discussion with TD (Spark streaming lead) about what needs to be done to
provide some semblance of dynamic scaling to streaming applications, e.g.
take into account the batch queue instead. We came up with a few ideas that
I will not detail here, but we are looking into this and do intend to
support it in the near future.

-Andrew



2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com:

 Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
 – it will be your insurance policy against sys crashes due to memory leaks.
 Until there is free RAM, spark streaming (spark) will NOT resort to disk –
 and of course resorting to disk from time to time (ie when there is no free
 RAM ) and taking a performance hit from that, BUT only until there is no
 free RAM



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Thursday, May 28, 2015 2:34 PM
 *To:* Evo Eftimov
 *Cc:* Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo, good points.



 On the dynamic resource allocation, I'm surmising this only works within a
 particular cluster setup.  So it improves the usage of current cluster
 resources but it doesn't make the cluster itself elastic. At least, that's
 my understanding.



 Memory + disk would be good and hopefully it'd take *huge* load on the
 system to start exhausting the disk space too.  I'd guess that falling onto
 disk will make things significantly slower due to the extra I/O.



 Perhaps we'll really want all of these elements eventually.  I think we'd
 want to start with memory only, keeping maxRate low enough not to overwhelm
 the consumers; implement the cluster autoscaling.  We might experiment with
 dynamic resource allocation before we get to implement the cluster
 autoscale.







 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients which
 keep them in zoomeeper dont worry about that ad long as you shut down your
 job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
 necessary





 Sent from Samsung Mobile



  Original message 

 From: Dmitry Goldenberg

 Date:2015/05/28 13:16 (GMT+00:00)

 To: Evo Eftimov

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 Thanks, Evo.  Per the last part of your comment, it sounds like we will
 need to implement a job manager which will be in control of starting the
 jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
 marking them as ones to relaunch, scaling the cluster up/down by
 adding/removing machines, and relaunching the 'suspended' (shut down) jobs.



 I suspect that relaunching the jobs may be tricky since that means keeping
 track of the starter offsets in Kafka 

Batch aggregation by sliding window + join

2015-05-28 Thread igor.berman
Hi,
I have a batch daily job that computes daily aggregate of several counters
represented by some object.
After daily aggregation is done, I want to compute block of 3 days
aggregation(3,7,30 etc)
To do so I need to add new daily aggregation to the current block and then
subtract from current block the daily aggregation of the last day within the
current block(sliding window...)
I've implemented it with something like:
baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
All rdds are keyed by unique id(long). Each rdd is saved in avro files after
the job finishes and loaded when job starts(on next day). baseBlockRdd is
much larger than lastDay and newDay rdds(depends on the size of the block)

Unfortunately the performance is not satisfactory due to many shuffles(I
have parallelism etc) I was looking for the way to improve performance
somehow, to make sure that one task joins same local keys without
reshuffling baseBlockRdd(which is big) each time the job starts(see
https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
so bottom line - how to join big rdd with smaller rdd without reshuffling
big rdd over and over again?
As soon as I've saved this big rdd and reloaded it from disk I want that
every other rdd will be partitioned and collocated by the same
partitioner(which is absent for hadooprdd) ... somehow, so that only small
rdds will be sent over network.

Another idea I had  - somehow split baseBlock into 2 parts with filter by
keys of small rdds and then join, however I'm not sure it's possible to
implement this filter without join.

any ideas would be appreciated,
thanks in advance
Igor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-28 Thread Abhishek Tripathi
Hi ,
I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
compatibility so that I can run Spark sql and access temp table remotely.

I used below command to build  Spark, it was build successful but when I
tried to access Hive data from Spark sql, I get error.

Thanks,
Abhi

---
*mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
-Phive-thriftserver -DskipTests clean package*

[cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
[cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
SET spark.sql.hive.version=0.13.1
spark-sql show tables;
sample_07 false
t1 false
Time taken: 3.901 seconds, Fetched 2 row(s)
spark-sql select * from t1;
15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
java.lang.VerifyError: class
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
overrides final method *getUnknownFields*
.()Lcom/google/protobuf/UnknownFieldSet;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)


Twitter Streaming HTTP 401 Error

2015-05-28 Thread tracynj
I am working on the Databricks Reference applications, porting them to my
company's platform, and extending them to emit RDF. I have already gotten
them working with the extension on EC2, and have the Log Analyzer
application working on our platform. But the Twitter Language Classifier
application keeps getting an HTTP 401 Error. This error shows the following
message:
15/05/28 16:30:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for
stream 0: Restarting receiver with delay 2000ms: Error receiving tweets -
401:Authentication credentials (https://dev.twitter.com/pages/auth) were
missing or incorrect. Ensure that you have set valid consumer key/secret,
access token/secret, and the system clock is in sync.
html\nhead\nmeta http-equiv=Content-Type content=text/html;
charset=utf-8/\ntitleError 401 Unauthorized/title
/head
body
HTTP ERROR: 401

pProblem accessing '/1.1/statuses/sample.json?stall_warnings=true'.
Reason:
preUnauthorized/pre
/body
/html

Relevant discussions can be found on the Internet at:
http://www.google.co.jp/search?q=d0031b0b or
http://www.google.co.jp/search?q=1db75513
TwitterException{exceptionCode=[d0031b0b-1db75513], statusCode=401,
message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=3.0.3}
at 
twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:177)
at
twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61)
at 
twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89)
at 
twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176)
at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164)
at
twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:462)

15/05/28 16:30:13 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from akka.tcp://sparkExecutor@mesos-0020-n3:56412

As the message says, the 401 code can be thrown by either bad credentials,
or by unsynchronized clocks. However, I know my credentials work, as I've
tested them both on EC2, and directly on the OAuth Testing Tool on the
Twitter Application Manager. Likewise, all the nodes on our cluster are
running ntpd, so that shouldn't be the problem either. I did some looking at
the API for the codes, and saw that the 401 could also be thrown for any
calls to the now deprecated v1 API endpoints, but given that the Databricks
applications run correctly in other contexts, such as EC2, I also do not
think this is the problem. I am at a loss to know why I am getting this
error, and have run out of ideas and could use some help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Streaming-HTTP-401-Error-tp23080.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Nan Xiao
Hi Somnath,

Is there a step-by-step instruction about using Eclipse to develop
Spark application? I think many people need them. Thanks!
Best Regards
Nan Xiao


On Thu, May 28, 2015 at 3:15 PM, Somnath Pandeya
somnath_pand...@infosys.com wrote:
 Try scala eclipse plugin to eclipsify spark project and import spark as 
 eclipse project

 -Somnath

 -Original Message-
 From: Nan Xiao [mailto:xiaonan830...@gmail.com]
 Sent: Thursday, May 28, 2015 12:32 PM
 To: user@spark.apache.org
 Subject: How to use Eclipse on Windows to build Spark environment?

 Hi all,

 I want to use Eclipse on Windows to build Spark environment, but find the 
 reference 
 page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
 doesn't contain any guide about Eclipse.

 Could anyone give tutorials or links about how to using Eclipse on Windows to 
 build Spark environment? Thanks in advance!

 Best Regards
 Nan Xiao

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org


  CAUTION - Disclaimer *
 This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely
 for the use of the addressee(s). If you are not the intended recipient, please
 notify the sender by e-mail and delete the original message. Further, you are 
 not
 to copy, disclose, or distribute this e-mail or its contents to any other 
 person and
 any such actions are unlawful. This e-mail may contain viruses. Infosys has 
 taken
 every reasonable precaution to minimize this risk, but is not liable for any 
 damage
 you may sustain as a result of any virus in this e-mail. You should carry out 
 your
 own virus checks before opening the e-mail or attachment. Infosys reserves the
 right to monitor and review the content of all messages sent to or from this 
 e-mail
 address. Messages sent to or from this e-mail address may be stored on the
 Infosys e-mail system.
 ***INFOSYS End of Disclaimer INFOSYS***

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Mohit Jaggi
I have used VoltDB and Spark. The use cases for the two are quite different. 
VoltDB is intended for transactions and also supports queries on the 
same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it 
is designed for querying immutable data (which may exist in several different 
forms of stores).

 On May 28, 2015, at 7:48 AM, Ashish Mukherjee ashish.mukher...@gmail.com 
 wrote:
 
 Hello,
 
 I was wondering if there is any documented comparison of SparkSQL with 
 MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries 
 to be run in a clustered environment. What is  the major differentiation?
 
 Regards,
 Ashish


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Registering Custom metrics [Spark-Streaming-monitoring]

2015-05-28 Thread Snehal Nagmote
Hello All,

I am using spark streaming 1.3 . I want to capture few custom metrics based
on accumulators, I followed somewhat similar to this approach ,

val instrumentation = new SparkInstrumentation(example.metrics)
  * val numReqs = sc.accumulator(0L)
  * instrumentation.source.registerDailyAccumulator(numReqs, numReqs)
  * instrumentation.register()

https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23

After registering metrics via accumulator , I am not able to see values of
these metrics on sink. I tried console sink , but still no luck

Do I need to set any properties in metrics.conf to enable this custom
source?


Also I noticed accumulators are displayed only in the task stages , It is
difficult to identify which task/stage on ui , that information would be
available .

Is there a way accumulators can be displayed on overall Job stats page ?

Any pointers/examples to achieve this would be helpful , spark monitoring
documentation is not very helpful,

Thanks in advance,

- Snehal


Re: Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Ashish Mukherjee
Hi Mohit,

Thanks for your reply.

If my use case is purely querying read-only data (no transaction
scenarios), at what scale is one of them a better option than the other? I
am aware that for scale which can be supported on a single node, VoltDB is
a better choice. However, when the scale grows to a clustered scenario,
which is the right engine at various degrees of scale?

Regards,
Ashish

On Fri, May 29, 2015 at 6:57 AM, Mohit Jaggi mohitja...@gmail.com wrote:

 I have used VoltDB and Spark. The use cases for the two are quite
 different. VoltDB is intended for transactions and also supports queries on
 the same(custom to voltdb) store. Spark(SQL) is NOT suitable for
 transactions; it is designed for querying immutable data (which may exist
 in several different forms of stores).

  On May 28, 2015, at 7:48 AM, Ashish Mukherjee 
 ashish.mukher...@gmail.com wrote:
 
  Hello,
 
  I was wondering if there is any documented comparison of SparkSQL with
 MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow
 queries to be run in a clustered environment. What is  the major
 differentiation?
 
  Regards,
  Ashish




Spark Cassandra

2015-05-28 Thread lucas
Hello, 

I am trying to save data from spark to Cassandra.

So I have an ScalaESRDD (because i take data from elasticsearch) that
contains a lot of key/values like this :

(AU16r4o_kbhIuSky3zFO , Map(@timestamp - 2015-05-21T21:35:54.035Z, 
timestamp - 2015-05-21 23:35:54,035, loglevel - INFO, thread - 
ajp-crmprod-fr-003%2F10.2.53.39-8009-178, ID_Echange - 1432244153901,
SessionID - -, ProcessID - 1432244149a998000f024390, IP -
37.163.216.219, proxy - 37.163.216.219, ContactID - 2255701356, Login -
46711205, messageType - ))

(AU16r4o_kbhIuSky3zFT , Map(@timestamp - 2015-05-21T21:35:55.450Z,
timestamp - 2015-05-21 23:35:55,450, loglevel - INFO, thread - 
ajp-crmprod-fr-003%2F10.2.53.39-8009-168, ID_Echange - 1432244155381,
SessionID - 76c8ad0dc9c86840cf1ff078911a6040, ProcessID -
1432244154a435000f024680, IP - 82.240.167.71, proxy -
82.240.167.71, ContactID - 2816846047, Login - 14642027, messageType -
))

So there is a key and a map as value. 
What I want to do is to put each Key/Value on a cassandra table. I used
saveToCassandra() but it is not exactly what I wish because after this I
have a table with the key and the map but it is not possible to do queries
on a map in Cassandra (so it is useless). 
So I want to put data in order to be able to query the map.

I thought about create a user type but how can I save the map in a user type
for each Key/Value ? 

I hope it is clear, 

Thank you very much for you help.

Best Regards,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-tp23065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Nan Xiao
Hi all,

I want to use Eclipse on Windows to build Spark environment, but find
the reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on
Windows to build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Somnath Pandeya
Try scala eclipse plugin to eclipsify spark project and import spark as eclipse 
project

-Somnath

-Original Message-
From: Nan Xiao [mailto:xiaonan830...@gmail.com] 
Sent: Thursday, May 28, 2015 12:32 PM
To: user@spark.apache.org
Subject: How to use Eclipse on Windows to build Spark environment?

Hi all,

I want to use Eclipse on Windows to build Spark environment, but find the 
reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on Windows to 
build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


 CAUTION - Disclaimer *
This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely 
for the use of the addressee(s). If you are not the intended recipient, please 
notify the sender by e-mail and delete the original message. Further, you are 
not 
to copy, disclose, or distribute this e-mail or its contents to any other 
person and 
any such actions are unlawful. This e-mail may contain viruses. Infosys has 
taken 
every reasonable precaution to minimize this risk, but is not liable for any 
damage 
you may sustain as a result of any virus in this e-mail. You should carry out 
your 
own virus checks before opening the e-mail or attachment. Infosys reserves the 
right to monitor and review the content of all messages sent to or from this 
e-mail 
address. Messages sent to or from this e-mail address may be stored on the 
Infosys e-mail system.
***INFOSYS End of Disclaimer INFOSYS***
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Akhil Das
I do this way:

- Launch a new instance by clicking on the slave instance and choose *launch
more like this *
*- *Once its launched, ssh into it and add the master public key to
.ssh/authorized_keys
- Add the slaves internal IP to the master's conf/slaves file
- do sbin/start-all.sh and it will show up along with other slaves.



Thanks
Best Regards

On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote:

 hi,

 I'm working on spark standalone system on ec2, and I'm having problems on
 resizing the cluster (meaning - adding or removing slaves).

 In the basic ec2 scripts
 (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
 script
 for lunching the cluster, not adding slaves to it. On the spark-standalone
 page
 (
 http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
 ),
 I can see only options for stopping and starting slaves, not adding them.

 What I try to do now (as a bad workaround...), is the following:

 1) Go to the ec2 UI, create image from the current slave
 2) Lunch new instance based on this image
 3) Copy the public DNS of this slave
 4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh,
 and add the DNS to the export SLAVES variables
 5) Running the script /root/spark-ec2/setup.sh

 After doing the above steps, I can see the new slave in the UI (8080 port)
 of the master. However, this solution is bad for many reasons:

 1) It requires many manual steps
 2) It requires stopping and starting the cluster
 3) There's no auto-detection in case slave stopped

 and many other reasons...

 oes anybody have another idea on how to add/remove slaves for standalone on
 a simple and safe way?

 thanks, nizan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Adding slaves on spark standalone on ec2

2015-05-28 Thread nizang
hi,

I'm working on spark standalone system on ec2, and I'm having problems on
resizing the cluster (meaning - adding or removing slaves).

In the basic ec2 scripts
(http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script
for lunching the cluster, not adding slaves to it. On the spark-standalone
page
(http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts),
I can see only options for stopping and starting slaves, not adding them.

What I try to do now (as a bad workaround...), is the following:

1) Go to the ec2 UI, create image from the current slave
2) Lunch new instance based on this image
3) Copy the public DNS of this slave
4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh,
and add the DNS to the export SLAVES variables
5) Running the script /root/spark-ec2/setup.sh

After doing the above steps, I can see the new slave in the UI (8080 port)
of the master. However, this solution is bad for many reasons:

1) It requires many manual steps
2) It requires stopping and starting the cluster
3) There's no auto-detection in case slave stopped

and many other reasons...

oes anybody have another idea on how to add/remove slaves for standalone on
a simple and safe way?

thanks, nizan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Hi Zhang,

Could you paste your code in a gist? Not sure what you are doing inside the
code to fill up memory.

Thanks
Best Regards

On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
 out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch interval
 of 1, do some filtering and aggregation, and then print to executor logs.
 So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry



Re: Recommended Scala version

2015-05-28 Thread Tathagata Das
Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out
soon) with Scala 2.11 and report issues.

TD

On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote:

 we are still running into issues with spark-shell not working on 2.11, but
 we are running on somewhat older master so maybe that has been resolved
 already.

 On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the
 Spark project has published maven artifacts that are compiled with 2.11 and
 2.10, although the downloads at http://spark.apache.org/downloads.html
 are still all for 2.10.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh 
 riteshoneinamill...@gmail.com wrote:

 Yes, recommended version is 2.10 as all the features are not supported
 by 2.11 version. Kafka libraries and JDBC components are yet to be ported
 to 2.11 version. And so if your project doesn't depend on these components,
 you can give v2.11 a try.

 Here's a link
 https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211
  for
 building with 2.11 version.

 Though, you won't be running into any issues if you try v2.10 as of now.
 But then again, the future releases will have to shift to 2.11 version once
 support for v2.10 ends in the long run.


 On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal 
 punya.bis...@gmail.com wrote:

 Dear Spark developers and users,

 Am I correct in believing that the recommended version of Scala to use
 with Spark is currently 2.10? Is there any plan to switch to 2.11 in
 future? Are there any advantages to using 2.11 today?

 Regards,
 Punya







DataFrame nested sctructure selection limit

2015-05-28 Thread Eugene Morozov
Hi!

I have a json file with some data, I’m able to create DataFrame out of it and 
the schema for particular part of it I’m interested in looks like following:

val json: DataFrame = sqlc.load(entities_with_address2.json, json)

root
 |-- attributes: struct (nullable = true)
 ||-- Address2: array (nullable = true)
 |||-- value: struct (nullable = true)
 ||||-- Zip: array (nullable = true)
 |||||-- element: struct (containsNull = true)
 ||||||-- value: struct (nullable = true)
 |||||||-- Zip5: array (nullable = true)
 ||||||||-- element: struct (containsNull = true)
 |||||||||-- value: string (nullable = true)


When I’m trying to just select the deepest field:
json.select(attributes.Address2.value.Zip.value.Zip5).collect()

It gives me a really weird exception. So I wonder if there is a particular 
limit of how deeply I’m able to select fields. What should I do to select the 
piece I need?

Here is the exception.
org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type 
ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
at 

How to use Eclipse on Windows to build Spark environment?

2015-05-28 Thread Nan Xiao
Hi all,

I want to use Eclipse on Windows to build Spark environment, but find
the reference 
page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup)
doesn't contain any guide about Eclipse.

Could anyone give tutorials or links about how to using Eclipse on
Windows to build Spark environment? Thanks in advance!

Best Regards
Nan Xiao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

I wrote a simple test job, it only does very basic operations. for example:

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
1)).map(_._2)
val logs = lines.flatMap { line =
  try {
Some(parse(line).extract[Impression])
  } catch {
case _: Exception = None
  }
}

logs.filter(_.s_id  0).count.foreachRDD { rdd =
  rdd.foreachPartition { iter =
iter.foreach(count = logger.info(count.toString))
  }
}

It receives messages from Kafka, parse the json, filter and count the
records, and then print it to logs.

Thanks.


On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit 
 this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





-- 
Jerry


Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Nizan Grauer
hi,

thanks for your answer!

I have few more:

1) the file /root/spark/conf/slaves , has the full DNS names of servers (
ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the
internal ip?
2) You call to start-all. Isn't it too aggressive? Let's say I have 20
slaves up, and I want to add one more, why should we stop the entire
cluster for this?

thanks, nizan

On Thu, May 28, 2015 at 10:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I do this way:

 - Launch a new instance by clicking on the slave instance and choose *launch
 more like this *
 *- *Once its launched, ssh into it and add the master public key to
 .ssh/authorized_keys
 - Add the slaves internal IP to the master's conf/slaves file
 - do sbin/start-all.sh and it will show up along with other slaves.



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote:

 hi,

 I'm working on spark standalone system on ec2, and I'm having problems on
 resizing the cluster (meaning - adding or removing slaves).

 In the basic ec2 scripts
 (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
 script
 for lunching the cluster, not adding slaves to it. On the spark-standalone
 page
 (
 http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
 ),
 I can see only options for stopping and starting slaves, not adding them.

 What I try to do now (as a bad workaround...), is the following:

 1) Go to the ec2 UI, create image from the current slave
 2) Lunch new instance based on this image
 3) Copy the public DNS of this slave
 4) SSH to the master, and edit the file
 /root/spark-ec2/ec2-variables.sh,
 and add the DNS to the export SLAVES variables
 5) Running the script /root/spark-ec2/setup.sh

 After doing the above steps, I can see the new slave in the UI (8080 port)
 of the master. However, this solution is bad for many reasons:

 1) It requires many manual steps
 2) It requires stopping and starting the cluster
 3) There's no auto-detection in case slave stopped

 and many other reasons...

 oes anybody have another idea on how to add/remove slaves for standalone
 on
 a simple and safe way?

 thanks, nizan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Adding slaves on spark standalone on ec2

2015-05-28 Thread Akhil Das
1. Upto you, you can either add internal ip or the external ip, it won't be
a problem unless they are not in the same network.

2. If you only want to start a particular slave, then you can do like:

sbin/start-slave.sh worker# master-spark-URL



Thanks
Best Regards

On Thu, May 28, 2015 at 1:52 PM, Nizan Grauer ni...@windward.eu wrote:

 hi,

 thanks for your answer!

 I have few more:

 1) the file /root/spark/conf/slaves , has the full DNS names of servers (
 ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the
 internal ip?
 2) You call to start-all. Isn't it too aggressive? Let's say I have 20
 slaves up, and I want to add one more, why should we stop the entire
 cluster for this?

 thanks, nizan

 On Thu, May 28, 2015 at 10:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I do this way:

 - Launch a new instance by clicking on the slave instance and choose *launch
 more like this *
 *- *Once its launched, ssh into it and add the master public key to
 .ssh/authorized_keys
 - Add the slaves internal IP to the master's conf/slaves file
 - do sbin/start-all.sh and it will show up along with other slaves.



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote:

 hi,

 I'm working on spark standalone system on ec2, and I'm having problems on
 resizing the cluster (meaning - adding or removing slaves).

 In the basic ec2 scripts
 (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only
 script
 for lunching the cluster, not adding slaves to it. On the
 spark-standalone
 page
 (
 http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
 ),
 I can see only options for stopping and starting slaves, not adding them.

 What I try to do now (as a bad workaround...), is the following:

 1) Go to the ec2 UI, create image from the current slave
 2) Lunch new instance based on this image
 3) Copy the public DNS of this slave
 4) SSH to the master, and edit the file
 /root/spark-ec2/ec2-variables.sh,
 and add the DNS to the export SLAVES variables
 5) Running the script /root/spark-ec2/setup.sh

 After doing the above steps, I can see the new slave in the UI (8080
 port)
 of the master. However, this solution is bad for many reasons:

 1) It requires many manual steps
 2) It requires stopping and starting the cluster
 3) There's no auto-detection in case slave stopped

 and many other reasons...

 oes anybody have another idea on how to add/remove slaves for standalone
 on
 a simple and safe way?

 thanks, nizan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Get all servers in security group in bash(ec2)

2015-05-28 Thread nizang
hi,

Is there anyway in bash (from an ec2 apsrk server) to list all the servers
in my security group (or better - in a given security group)

I tried using:

wget -q -O - http://instance-data/latest/meta-data/security-groups
security_group_xxx

but now, I want all the servers in security group security_group_xxx

thanks, nizan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Get all servers in security group in bash(ec2)

2015-05-28 Thread Akhil Das
You can use python boto library for that, in fact spark-ec2 script uses it
underneath. Here's the
https://github.com/apache/spark/blob/master/ec2/spark_ec2.py#L706 call
spark-ec2 is making to get all machines under a given security group.

Thanks
Best Regards

On Thu, May 28, 2015 at 2:22 PM, nizang ni...@windward.eu wrote:

 hi,

 Is there anyway in bash (from an ec2 apsrk server) to list all the servers
 in my security group (or better - in a given security group)

 I tried using:

 wget -q -O - http://instance-data/latest/meta-data/security-groups
 security_group_xxx

 but now, I want all the servers in security group security_group_xxx

 thanks, nizan



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Gerard Maas
Hi,

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.


*Longer version.*

I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.

Then you have two things to consider: the Streaming receivers and the Spark
processing cluster.

Currently, the receiving topology is static. One receiver is allocated with
each DStream instantiated and it will use 1 core in the cluster. Once the
StreamingContext is started, this topology cannot be changed, therefore the
number of Kafka receivers is fixed for the lifetime of your DStream.
What we do is to calculate the cluster capacity and use that as a fixed
upper bound (with a margin) for the receiver throughput.

There's work in progress to add a reactive model to the receiver, where
backpressure can be applied to handle overload conditions. See
https://issues.apache.org/jira/browse/SPARK-7398

Once the data is received, it will be processed in a 'classical' Spark
pipeline, so previous posts on spark resource scheduling might apply.

Regarding metrics, the standard metrics subsystem of spark will report
streaming job performance. Check the driver's metrics endpoint to peruse
the available metrics:

driver:ui-port/metrics/json

-kr, Gerard.


(*) Spark is a project that moves so fast that statements might be
invalidated by new work every minute.

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 I'm trying to understand if there are design patterns for autoscaling Spark
 (add/remove slave machines to the cluster) based on the throughput.

 Assuming we can throttle Spark consumers, the respective Kafka topics we
 stream data from would start growing.  What are some of the ways to
 generate
 the metrics on the number of new messages and the rate they are piling up?
 This perhaps is more of a Kafka question; I see a pretty sparse javadoc
 with
 the Metric interface and not much else...

 What are some of the ways to expand/contract the Spark cluster? Someone has
 mentioned Mesos...

 I see some info on Spark metrics in  the Spark monitoring guide
 https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
 perhaps implement a custom sink that would help us autoscale up or down
 based on the throughput?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Can you replace your counting part with this?

logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



Thanks
Best Regards

On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer 
 (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit 
 this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
 the extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 
 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





 --
 Jerry



Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thank you, Gerard.

We're looking at the receiver-less setup with Kafka Spark streaming so I'm
not sure how to apply your comments to that case (not that we have to use
receiver-less but it seems to offer some advantages over the
receiver-based).

As far as the number of Kafka receivers is fixed for the lifetime of your
DStream -- this may be OK to start with. What I'm researching is the
ability to add worker nodes to the Spark cluster when needed and remove
them when no longer needed.  Do I understand correctly that a single
receiver may cause work to be farmed out to multiple 'slave'
machines/worker nodes?  If that's the case, we're less concerned with
multiple receivers; we're concerned with the worker node cluster itself.

If we use the ConsumerOffsetChecker class in Kafka that Rajesh mentioned
and instrument dynamic adding/removal of machines, my subsequent questions
then are, a) will Spark sense the addition of a new node / is it sufficient
that the cluster manager is aware, then work just starts flowing there?
 and  b) what would be a way to gracefully remove a worker node when the
load subsides, so that no currently running Spark job is killed?

- Dmitry

On Thu, May 28, 2015 at 7:36 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi,

 tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
 streaming processes is not supported.


 *Longer version.*

 I assume that you are talking about Spark Streaming as the discussion is
 about handing Kafka streaming data.

 Then you have two things to consider: the Streaming receivers and the
 Spark processing cluster.

 Currently, the receiving topology is static. One receiver is allocated
 with each DStream instantiated and it will use 1 core in the cluster. Once
 the StreamingContext is started, this topology cannot be changed, therefore
 the number of Kafka receivers is fixed for the lifetime of your DStream.
 What we do is to calculate the cluster capacity and use that as a fixed
 upper bound (with a margin) for the receiver throughput.

 There's work in progress to add a reactive model to the receiver, where
 backpressure can be applied to handle overload conditions. See
 https://issues.apache.org/jira/browse/SPARK-7398

 Once the data is received, it will be processed in a 'classical' Spark
 pipeline, so previous posts on spark resource scheduling might apply.

 Regarding metrics, the standard metrics subsystem of spark will report
 streaming job performance. Check the driver's metrics endpoint to peruse
 the available metrics:

 driver:ui-port/metrics/json

 -kr, Gerard.


 (*) Spark is a project that moves so fast that statements might be
 invalidated by new work every minute.

 On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 Hi,

 I'm trying to understand if there are design patterns for autoscaling
 Spark
 (add/remove slave machines to the cluster) based on the throughput.

 Assuming we can throttle Spark consumers, the respective Kafka topics we
 stream data from would start growing.  What are some of the ways to
 generate
 the metrics on the number of new messages and the rate they are piling up?
 This perhaps is more of a Kafka question; I see a pretty sparse javadoc
 with
 the Metric interface and not much else...

 What are some of the ways to expand/contract the Spark cluster? Someone
 has
 mentioned Mesos...

 I see some info on Spark metrics in  the Spark monitoring guide
 https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
 perhaps implement a custom sink that would help us autoscale up or down
 based on the throughput?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the receiver throughput.

 

There's work in progress to add a reactive model to the receiver, where 
backpressure can be applied to handle overload conditions. See 
https://issues.apache.org/jira/browse/SPARK-7398

 

Once the data is received, it will be processed in a 'classical' Spark 
pipeline, so previous posts on spark resource scheduling might apply.

 

Regarding metrics, the standard metrics subsystem of spark will report 
streaming job performance. Check the driver's metrics endpoint to peruse the 
available metrics:

 

driver:ui-port/metrics/json

 

-kr, Gerard.

 

 

(*) Spark is a project that moves so fast that statements might be invalidated 
by new work every minute.

 

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote:

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: 

Re: why does com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException happen?

2015-05-28 Thread randylu
begs for your help 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-com-esotericsoftware-kryo-KryoException-java-u-til-ConcurrentModificationException-happen-tp23067p23068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



why does com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException happen?

2015-05-28 Thread randylu
  My program runs for 500 iterations, but fails at about 150 iterations
almostly. 
  It's hard to explain the details of my program, but i think my program is
ok, for it runs succesfully somtimes. *I just wana know in which situations
this exception will happen*.

  The detail error information is as follows:

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 32 in stage 43644.0 failed 4 time
s, most recent failure: Lost task 32.3 in stage 43644.0 (TID 72976,
10.194.117.197): com.esotericsoftware.kryo.KryoException: java.u
til.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (org.apache.spark.executor.ExecutorURLClassLoader)
classLoader (com.twitter.chill.KryoBase)
kryo (org.apache.spark.serializer.KryoDeserializationStream)
$outer (org.apache.spark.serializer.DeserializationStream$$anon$1)
delegate (org.apache.spark.InterruptibleIterator)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

...

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:130)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:370)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 44 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler
.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Evo.  Per the last part of your comment, it sounds like we will
need to implement a job manager which will be in control of starting the
jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
marking them as ones to relaunch, scaling the cluster up/down by
adding/removing machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping
track of the starter offsets in Kafka topic(s) from which the jobs started
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
of jobs, coupled with the wait for the new machines to come online may turn
out quite time-consuming which will make for lengthy request times, and our
requests are not asynchronous.  Ideally, the currently running jobs would
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job
scheduler not to send work to the node being taken out, find out when the
last job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine
while the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 @DG; The key metrics should be



 -  Scheduling delay – its ideal state is to remain constant over
 time and ideally be less than the time of the microbatch window

 -  The average job processing time should remain less than the
 micro-batch window

 -  Number of Lost Jobs – even if there is a single Job lost that
 means that you have lost all messages for the DStream RDD processed by that
 job due to the previously described spark streaming memory leak condition
 and subsequent crash – described in previous postings submitted by me



 You can even go one step further and periodically issue “get/check free
 memory” to see whether it is decreasing relentlessly at a constant rate –
 if it touches a predetermined RAM threshold that should be your third
 metric



 Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
 you can implement one on your own without waiting for Jiras and new
 features whenever they might be implemented by the Spark dev team –
 moreover you can avoid using slow mechanisms such as ZooKeeper and even
 incorporate some Machine Learning in your Feedback Loop to make it handle
 the message consumption rate more intelligently and benefit from ongoing
 online learning – BUT this is STILL about voluntarily sacrificing your
 performance in the name of keeping your system stable – it is not about
 scaling your system/solution



 In terms of how to scale the Spark Framework Dynamically – even though
 this is not supported at the moment out of the box I guess you can have a
 sys management framework spin dynamically a few more boxes (spark worker
 nodes), stop dynamically your currently running Spark Streaming Job,
 relaunch it with new params e.g. more Receivers, larger number of
 Partitions (hence tasks), more RAM per executor etc. Obviously this will
 cause some temporary delay in fact interruption in your processing but if
 the business use case can tolerate that then go for it



 *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
 *Sent:* Thursday, May 28, 2015 12:36 PM
 *To:* dgoldenberg
 *Cc:* spark users
 *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
 growth in Kafka or Spark's metrics?



 Hi,



 tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
 streaming processes is not supported.





 *Longer version.*



 I assume that you are talking about Spark Streaming as the discussion is
 about handing Kafka streaming data.



 Then you have two things to consider: the Streaming receivers and the
 Spark processing cluster.



 Currently, the receiving topology is static. One receiver is allocated
 with each DStream instantiated and it will use 1 core in the cluster. Once
 the StreamingContext is started, this topology cannot be changed, therefore
 the number of Kafka receivers is fixed for the lifetime of your DStream.

 What we do is to calculate the cluster capacity and use that as a fixed
 upper bound (with a margin) for the receiver throughput.



 There's work in progress to add a reactive model to the receiver, where
 backpressure can be applied to handle overload conditions. See
 https://issues.apache.org/jira/browse/SPARK-7398



 Once the data is received, it will be processed in a 'classical' Spark
 pipeline, so previous posts on spark resource scheduling might apply.



 Regarding metrics, the standard metrics subsystem of spark will report
 streaming job performance. Check the driver's metrics endpoint to peruse
 the available metrics:



 driver:ui-port/metrics/json



 -kr, Gerard.





 (*) Spark is a project that moves so fast that statements might be
 

SPARK STREAMING PROBLEM

2015-05-28 Thread Animesh Baranawal
Hi,

I am trying to extract the filenames from which a Dstream is generated by
parsing the toDebugString method on RDD
I am implementing the following code in spark-shell:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.textFileStream(// directory //)

def g : List[String] = {
   var res = List[String]()
   lines.foreachRDD{ rdd = {
  if(rdd.count  0){
  val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
  files.foreach{ ms = {
 res = ms.split( )(2)::res
  }}   }
   }}
   res
}

g.foreach(x = {println(x); println()})

However when I run the code, nothing gets printed on the console apart from
the logs. Am I doing something wrong?
And is there any better way to extract the file names from DStream ?

Thanks in advance


Animesh


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary


Sent from Samsung Mobile

div Original message /divdivFrom: Dmitry Goldenberg 
dgoldenberg...@gmail.com /divdivDate:2015/05/28  13:16  (GMT+00:00) 
/divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: Gerard Maas 
gerard.m...@gmail.com,spark users user@spark.apache.org /divdivSubject: 
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or 
Spark's metrics? /divdiv
/divThanks, Evo.  Per the last part of your comment, it sounds like we will 
need to implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote:
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers 

Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
You must start the StreamingContext by calling ssc.start()

On Thu, May 28, 2015 at 6:57 PM, Animesh Baranawal 
animeshbarana...@gmail.com wrote:

 Hi,

 I am trying to extract the filenames from which a Dstream is generated by
 parsing the toDebugString method on RDD
 I am implementing the following code in spark-shell:

 import org.apache.spark.streaming.{StreamingContext, Seconds}
 val ssc = new StreamingContext(sc,Seconds(10))
 val lines = ssc.textFileStream(// directory //)

 def g : List[String] = {
var res = List[String]()
lines.foreachRDD{ rdd = {
   if(rdd.count  0){
   val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
   files.foreach{ ms = {
  res = ms.split( )(2)::res
   }}   }
}}
res
 }

 g.foreach(x = {println(x); println()})

 However when I run the code, nothing gets printed on the console apart
 from the logs. Am I doing something wrong?
 And is there any better way to extract the file names from DStream ?

 Thanks in advance


 Animesh




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Dataframe Partitioning

2015-05-28 Thread Masf
Hi.

I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner
join between these dataframes, the result contains 200 partitions. *Why?*

df1.join(df2, df1(id) === df2(id), Inner) = returns 200 partitions


Thanks!!!
-- 


Regards.
Miguel Ángel


Fwd: SPARK STREAMING PROBLEM

2015-05-28 Thread Animesh Baranawal
I also started the streaming context by running ssc.start() but still apart
from logs nothing of g gets printed.

-- Forwarded message --
From: Animesh Baranawal animeshbarana...@gmail.com
Date: Thu, May 28, 2015 at 6:57 PM
Subject: SPARK STREAMING PROBLEM
To: user@spark.apache.org


Hi,

I am trying to extract the filenames from which a Dstream is generated by
parsing the toDebugString method on RDD
I am implementing the following code in spark-shell:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.textFileStream(// directory //)

def g : List[String] = {
   var res = List[String]()
   lines.foreachRDD{ rdd = {
  if(rdd.count  0){
  val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
  files.foreach{ ms = {
 res = ms.split( )(2)::res
  }}   }
   }}
   res
}

g.foreach(x = {println(x); println()})

However when I run the code, nothing gets printed on the console apart from
the logs. Am I doing something wrong?
And is there any better way to extract the file names from DStream ?

Thanks in advance


Animesh


Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS

2015-05-28 Thread Andrew Otto
 val sqlContext = new HiveContext(sc)
 val schemaRdd = sqlContext.sql(some complex SQL)


It mostly works, but have been having issues with tables that contains a large 
amount of data:

https://issues.apache.org/jira/browse/SPARK-6910 
https://issues.apache.org/jira/browse/SPARK-6910


 On May 27, 2015, at 20:52, Sanjay Subramanian 
 sanjaysubraman...@yahoo.com.INVALID wrote:
 
 hey guys
 
 On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , 
 there are about 300+ hive tables.
 The data is stored an text (moving slowly to Parquet) on HDFS.
 I want to use SparkSQL and point to the Hive metadata and be able to define 
 JOINS etc using a programming structure like this 
 
 import org.apache.spark.sql.hive.HiveContext
 val sqlContext = new HiveContext(sc)
 val schemaRdd = sqlContext.sql(some complex SQL)
 
 
 Is that the way to go ? Some guidance will be great.
 
 thanks
 
 sanjay
 
 
 



Soft distinct on data frames.

2015-05-28 Thread Jan-Paul Bultmann
Hey,
Is there a way to do a distinct operation on each partition only?
My program generates quite a few duplicate tuples and it would be nice to 
remove some of these as an optimisation
without having to reshuffle the data.

I’ve also noticed that plans generated with an unique transformation have this 
peculiar form:

== Physical Plan ==
Distinct false
 Exchange (HashPartitioning [_0#347L,_1#348L], 200)
  Distinct true
   PhysicalRDD [_0#347L,_1#348L], MapPartitionsRDD[247] at map at 
SQLContext.scala:394

Does this mean that set semantics are just a flag that can be turned off and on 
for each shuffling operation?
If so, is it possible to do so in general, so that one always uses set 
semantics instead of bag?
Or will the optimiser try to propagate the set semantics?

Cheers Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
The oproblem lies the way you are doing the processing.

After the g.foreach(x = {println(x); println()}) are you
doing ssc.start. It means till now what you did is just setup the
computation stpes but spark has not started any real processing. so when
you do g.foreach what it iterates over is the empyt list yopu are returning
from the g method hence it does not print anything

On Thu, May 28, 2015 at 7:02 PM, Animesh Baranawal 
animeshbarana...@gmail.com wrote:

 I also started the streaming context by running ssc.start() but still
 apart from logs nothing of g gets printed.

 -- Forwarded message --
 From: Animesh Baranawal animeshbarana...@gmail.com
 Date: Thu, May 28, 2015 at 6:57 PM
 Subject: SPARK STREAMING PROBLEM
 To: user@spark.apache.org


 Hi,

 I am trying to extract the filenames from which a Dstream is generated by
 parsing the toDebugString method on RDD
 I am implementing the following code in spark-shell:

 import org.apache.spark.streaming.{StreamingContext, Seconds}
 val ssc = new StreamingContext(sc,Seconds(10))
 val lines = ssc.textFileStream(// directory //)

 def g : List[String] = {
var res = List[String]()
lines.foreachRDD{ rdd = {
   if(rdd.count  0){
   val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
   files.foreach{ ms = {
  res = ms.split( )(2)::res
   }}   }
}}
res
 }

 g.foreach(x = {println(x); println()})

 However when I run the code, nothing gets printed on the console apart
 from the logs. Am I doing something wrong?
 And is there any better way to extract the file names from DStream ?

 Thanks in advance


 Animesh





-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


[Streaming] Configure executor logging on Mesos

2015-05-28 Thread Gerard Maas
Hi,

I'm trying to control the verbosity of the logs on the Mesos executors with
no luck so far. The default behaviour is INFO on stderr dump with an
unbounded growth that gets too big at some point.

I noticed that when the executor is instantiated, it locates a default log
configuration in the spark assembly:

I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
20150528-063307-780930314-5050-8152-S5
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

So, no matter what I provide in my job jar files (or also tried with
(spark.executor.extraClassPath=log4j.properties) takes effect in the
executor's configuration.

How should I configure the log on the executors?

thanks, Gerard.


Re: Dataframe Partitioning

2015-05-28 Thread Silvio Fiorito
That’s due to the config setting spark.sql.shuffle.partitions which defaults to 
200


From: Masf
Date: Thursday, May 28, 2015 at 10:02 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Dataframe Partitioning

Hi.

I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner 
join between these dataframes, the result contains 200 partitions. Why?

df1.join(df2, df1(id) === df2(id), Inner) = returns 200 partitions


Thanks!!!
--


Regards.
Miguel Ángel


Spark streaming with kafka

2015-05-28 Thread boci
Hi guys,

I using spark streaming with kafka... In local machine (start as java
application without using spark-submit) it's work, connect to kafka and do
the job (*). I tried to put into spark docker container (hadoop 2.6, spark
1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of
success...

No error on the console (the application started), I see something received
from kafka but the result is not written out to elasticsearch...

Where can I start the debug? I see in the spark console two job, both 0/1...

Thanks
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Evo, good points.

On the dynamic resource allocation, I'm surmising this only works within a
particular cluster setup.  So it improves the usage of current cluster
resources but it doesn't make the cluster itself elastic. At least, that's
my understanding.

Memory + disk would be good and hopefully it'd take *huge* load on the
system to start exhausting the disk space too.  I'd guess that falling onto
disk will make things significantly slower due to the extra I/O.

Perhaps we'll really want all of these elements eventually.  I think we'd
want to start with memory only, keeping maxRate low enough not to overwhelm
the consumers; implement the cluster autoscaling.  We might experiment with
dynamic resource allocation before we get to implement the cluster
autoscale.



On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
 exhausted spark streaming will resort to keeping new RDDs on disk which
 will prevent it from crashing and hence loosing them. Then some memory will
 get freed and it will resort back to RAM and so on and so forth





 Sent from Samsung Mobile

  Original message 

 From: Evo Eftimov

 Date:2015/05/28 13:22 (GMT+00:00)

 To: Dmitry Goldenberg

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 You can always spin new boxes in the background and bring them into the
 cluster fold when fully operational and time that with job relaunch and
 param change



 Kafka offsets are mabaged automatically for you by the kafka clients which
 keep them in zoomeeper dont worry about that ad long as you shut down your
 job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
 necessary





 Sent from Samsung Mobile



  Original message 

 From: Dmitry Goldenberg

 Date:2015/05/28 13:16 (GMT+00:00)

 To: Evo Eftimov

 Cc: Gerard Maas ,spark users

 Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
 in Kafka or Spark's metrics?



 Thanks, Evo.  Per the last part of your comment, it sounds like we will
 need to implement a job manager which will be in control of starting the
 jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
 marking them as ones to relaunch, scaling the cluster up/down by
 adding/removing machines, and relaunching the 'suspended' (shut down) jobs.



 I suspect that relaunching the jobs may be tricky since that means keeping
 track of the starter offsets in Kafka topic(s) from which the jobs started
 working on.



 Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
 of jobs, coupled with the wait for the new machines to come online may turn
 out quite time-consuming which will make for lengthy request times, and our
 requests are not asynchronous.  Ideally, the currently running jobs would
 continue to run on the machines currently available in the cluster.



 In the scale-down case, the job manager would want to signal to Spark's
 job scheduler not to send work to the node being taken out, find out when
 the last job has finished running on the node, then take the node out.



 This is somewhat like changing the number of cylinders in a car engine
 while the car is running...



 Sounds like a great candidate for a set of enhancements in Spark...



 On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 @DG; The key metrics should be



 -  Scheduling delay – its ideal state is to remain constant over
 time and ideally be less than the time of the microbatch window

 -  The average job processing time should remain less than the
 micro-batch window

 -  Number of Lost Jobs – even if there is a single Job lost that
 means that you have lost all messages for the DStream RDD processed by that
 job due to the previously described spark streaming memory leak condition
 and subsequent crash – described in previous postings submitted by me



 You can even go one step further and periodically issue “get/check free
 memory” to see whether it is decreasing relentlessly at a constant rate –
 if it touches a predetermined RAM threshold that should be your third
 metric



 Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
 you can implement one on your own without waiting for Jiras and new
 features whenever they might be implemented by the Spark dev team –
 moreover you can avoid using slow mechanisms such as ZooKeeper and even
 incorporate some Machine Learning in your Feedback Loop to make it handle
 the message consumption rate more intelligently 

Spark SQL v MemSQL/Voltdb

2015-05-28 Thread Ashish Mukherjee
Hello,

I was wondering if there is any documented comparison of SparkSQL with
MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow
queries to be run in a clustered environment. What is  the major
differentiation?

Regards,
Ashish


Best practice to update a MongoDB document from Sparks

2015-05-28 Thread nibiau
Hello,
I'm evaluating Spark/SparkStreaming .
I use SparkStreaming to receive messages from a Kafka topic.
As soon as I have a JavaReceiverInputDStream , I have to treat each message, 
for each one I have to search in MongoDB to find if a document does exist.
If I found the document I have to update it with some values , else I have to 
create a new one.
I success to create a new document by using saveAsNewAPIHadoopFiles , but I 
don't know the best practice to update an existing document from Spark.
In a basic Java program I would use :
   myColl.update(updateQuery, updateCommand);

So, do I have to use this same way to update the document, skip the message and 
let saveAsNewAPIHadoopFiles in order to create the ones to be created ?
Or any other better way ?

Tks
Nicolas 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk will 
make things significantly slower due to the extra I/O.

 

Perhaps we'll really want all of these elements eventually.  I think we'd want 
to start with memory only, keeping maxRate low enough not to overwhelm the 
consumers; implement the cluster autoscaling.  We might experiment with dynamic 
resource allocation before we get to implement the cluster autoscale.

 

 

 

On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote:

You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

Unfortunately, they're still growing, both driver and executors.

I run the same job with local mode, everything is fine.

On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
 - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a
 tool called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 After submitting the job, if you do a ps aux | grep spark-submit
 then you can see all JVM params. Are you using the highlevel consumer
 (receiver based) for receiving data from Kafka? In that case if your
 throughput is high and the processing delay exceeds batch interval then 
 you
 will hit this memory issues as the data will keep on receiving and is
 dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it 
 slows
 things down). Another alternate will be to use the lowlevel kafka
 consumer https://github.com/dibbhatt/kafka-spark-consumer or to
 use the non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because 
 of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
 the extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384),
 but the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 
 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





 --
 Jerry





-- 
Jerry


Re: debug jsonRDD problem?

2015-05-28 Thread Michael Stone

On Wed, May 27, 2015 at 02:06:16PM -0700, Ted Yu wrote:

Looks like the exception was caused by resolved.get(prefix ++ a) returning None
:
        a = StructField(a.head, resolved.get(prefix ++ a).get, nullable =
true)

There are three occurrences of resolved.get() in createSchema() - None should
be better handled in these places.

My two cents.


Here's the simplest test case I've come up with:

sqlContext.jsonRDD(sc.parallelize(Array({\'```'\:\\}))).count()

Mike Stone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark with OpenCV causes python worker to crash

2015-05-28 Thread Sam Stoelinga
Hi sparkers,

I am working on a PySpark application which uses the OpenCV library. It
runs fine when running the code locally but when I try to run it on Spark
on the same Machine it crashes the worker.

The code can be found here:
https://gist.github.com/samos123/885f9fe87c8fa5abf78f

This is the error message taken from STDERR of the worker log:
https://gist.github.com/samos123/3300191684aee7fc8013

Would like pointers or tips on how to debug further? Would be nice to know
the reason why the worker crashed.

Thanks,
Sam Stoelinga


org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)


Loading CSV to DataFrame and saving it into Parquet for speedup

2015-05-28 Thread M Rez
I am using Spark-CSV  to load a 50GB of around 10,000 CSV files into couple
of unified DataFrames. Since this process is slow I have wrote this snippet: 

targetList.foreach { target =
// this is using sqlContext.load by getting list of files then
loading them according to schema files that
// read before and built their StructType
getTrace(target, sqlContext)
  .reduce(_ unionAll _)
  .registerTempTable(target.toUpperCase())
sqlContext.sql(SELECT * FROM  + target.toUpperCase())
  .saveAsParquetFile(processedTraces + target)

to load the csv files and then union all the cvs files with the same schema
and write them into a single parquet file with their parts. The problems is
my cpu (not all cpus are being busy) and disk (ssd, with 1MB/s at most) are
barely utilized. I wonder what am I doing wrong?!

snippet for getTrace: 

def getTrace(target: String, sqlContext: SQLContext): Seq[DataFrame] = {
logFiles(mainLogFolder + target).map {
  file =
sqlContext.load(
  driver,
  // schemaSelect builds the StructType once
  schemaSelect(schemaFile, target, sqlContext),
  Map(path - file, header - false, delimiter - ,))
}
  }

thanks for any help




-
regards,
mohamad
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-CSV-to-DataFrame-and-saving-it-into-Parquet-for-speedup-tp23071.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org