Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields
Probably a naive question: can you try the same in hive CLI and see if your SQL is working? Looks like hive thing to me as spark is faithfully delegating the query to hive. On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote: Hi , I'm using CDH5.4.0 quick start VM and tried to build Spark with Hive compatibility so that I can run Spark sql and access temp table remotely. I used below command to build Spark, it was build successful but when I tried to access Hive data from Spark sql, I get error. Thanks, Abhi --- *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive -Phive-thriftserver -DskipTests clean package* [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/ [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql SET spark.sql.hive.version=0.13.1 spark-sql show tables; sample_07 false t1 false Time taken: 3.901 seconds, Fetched 2 row(s) spark-sql select * from t1; 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1] java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method *getUnknownFields* .()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
spark mlib variance analysis
I have a simple problem: i got mean number of people on one place by hour(time-series like), and now i want to know if the weather condition have impact on the mean number. I would do it with variance analysis like anova in spss or analysing the resultant regression model summary How is it possible to do with apache spark mlib? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-mlib-variance-analysis-tp23079.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Batch aggregation by sliding window + join
Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Best Ayan On 29 May 2015 03:31, igor.berman igor.ber...@gmail.com wrote: Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task joins same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same partitioner(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Which would imply that if there was a load manager type of service, it could signal to the driver(s) that they need to acquiesce, i.e. process what's at hand and terminate. Then bring up a new machine, then restart the driver(s)... Same deal with removing machines from the cluster. Send a signal for the drivers to pipe down and terminate, then restart them. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com: Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Thursday, May 28, 2015 2:34 PM *To:* Evo Eftimov *Cc:* Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From:
Re: yarn-cluster spark-submit process not dying
Thanks Sandy- I was digging through the code in the deploy.yarn.Client and literally found that property right before I saw your reply. I'm on 1.2.x right now which doesn't have the property. I guess I need to update sooner rather than later. On Thu, May 28, 2015 at 3:56 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Corey, As of this PR https://github.com/apache/spark/pull/5297/files, this can be controlled with spark.yarn.submit.waitAppCompletion. -Sandy On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote: I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm noticing the jvm that fires up to allocate the resources, etc... is not going away after the application master and executors have been allocated. Instead, it just sits there printing 1 second status updates to the console. If I kill it, my job still runs (as expected). Is there an intended way to stop this from happening and just have the local JVM die when it's done allocating the resources and deploying the application master?
UDF accessing hive struct array fails with buffer underflow from kryo
Hi all, I'm using Spark 1.3.1 with Hive 0.13.1. When running a UDF accessing a hive struct array the query fails with: Caused by: com.esotericsoftware.kryo.KryoException: Buffer underflow. Serialization trace: fieldName (org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector$MyField) fields (org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector) listElementObjectInspector (org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector) argStructArrayOI (com.groupon.hive.udf.filter.StructStringMemberFilterUDF) at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.readAscii_slow(Input.java:580) at com.esotericsoftware.kryo.io.Input.readAscii(Input.java:558) at com.esotericsoftware.kryo.io.Input.readString(Input.java:436) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626) at org.apache.hadoop.hive.ql.exec.Utilities.deserializeObjectByKryo(Utilities.java:918) ... 102 more Anyone seen anything similar? argStructArrayOI is a Hive ListObjectInspector. The field the argStructArrayOI is accessing looks like: arraystructlt;order_by_id:bigint,subscription_id:bigint,unsubscribe_hash:string,country_id:int,optin_hash:string,city_part_id:bigint,subscription_type:string,locale:string The table is a hive table. Running the same query on Hive works... what's going on here? Any suggestions on how to debug this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UDF-accessing-hive-struct-array-fails-with-buffer-underflow-from-kryo-tp23078.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark java.io.FileNotFoundException: /user/spark/applicationHistory/application
hi, Suddenly spark jobs started failing with following error Exception in thread main java.io.FileNotFoundException: /user/spark/applicationHistory/application_1432824195832_1275.inprogress (No such file or directory) full trace here [21:50:04 x...@hadoop-client01.dev:~]$ spark-submit --class org.apache.spark.examples.SparkPi --master yarn /usr/lib/spark/lib/spark-examples.jar 10 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/28 21:55:21 INFO SparkContext: Running Spark version 1.3.0 15/05/28 21:55:21 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/05/28 21:55:21 WARN SparkConf: SPARK_JAVA_OPTS was detected (set to ' -Dspark.local.dir=/srv/tmp/xyz '). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - ./spark-submit with --driver-java-options to set -X options for a driver - spark.executor.extraJavaOptions to set -X options for executors - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) 15/05/28 21:55:21 WARN SparkConf: Setting 'spark.executor.extraJavaOptions' to ' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around. 15/05/28 21:55:21 WARN SparkConf: Setting 'spark.driver.extraJavaOptions' to ' -Dspark.local.dir=/srv/tmp/xyz ' as a work-around. 15/05/28 21:55:22 INFO SecurityManager: Changing view acls to: xyz 15/05/28 21:55:22 INFO SecurityManager: Changing modify acls to: xyz 15/05/28 21:55:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(xyz); users with modify permissions: Set(xyz) 15/05/28 21:55:22 INFO Slf4jLogger: Slf4jLogger started 15/05/28 21:55:22 INFO Remoting: Starting remoting 15/05/28 21:55:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@hadoop-client01.abc.com:51876] 15/05/28 21:55:22 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@hadoop-client01.abc.com:51876] 15/05/28 21:55:22 INFO Utils: Successfully started service 'sparkDriver' on port 51876. 15/05/28 21:55:22 INFO SparkEnv: Registering MapOutputTracker 15/05/28 21:55:22 INFO SparkEnv: Registering BlockManagerMaster 15/05/28 21:55:22 INFO DiskBlockManager: Created local directory at /srv/tmp/xyz/spark-1e66e6eb-7ad6-4f62-87fc-f0cfaa631e36/blockmgr-61f866b8-6475-4a11-88b2-792d2ba22662 15/05/28 21:55:22 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/05/28 21:55:23 INFO HttpFileServer: HTTP File server directory is /srv/tmp/xyz/spark-2b676170-3f88-44bf-87a3-600de1b7ee24/httpd-b84f76d5-26c7-4c63-9223-f6c5aa3899f0 15/05/28 21:55:23 INFO HttpServer: Starting HTTP Server 15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/28 21:55:23 INFO AbstractConnector: Started SocketConnector@0.0.0.0:41538 15/05/28 21:55:23 INFO Utils: Successfully started service 'HTTP file server' on port 41538. 15/05/28 21:55:23 INFO SparkEnv: Registering OutputCommitCoordinator 15/05/28 21:55:23 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/28 21:55:23 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/05/28 21:55:23 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/05/28 21:55:23 INFO SparkUI: Started SparkUI at http://hadoop-client01.abc.com:4040 15/05/28 21:55:23 INFO SparkContext: Added JAR file:/usr/lib/spark/lib/spark-examples.jar at http://10.0.3.62:41538/jars/spark-examples.jar with timestamp 1432850123523 15/05/28 21:55:24 INFO Client: Requesting a new application from cluster with 16 NodeManagers 15/05/28 21:55:24 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (61440 MB per container) 15/05/28 21:55:24 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/05/28 21:55:24 INFO Client: Setting up container launch context for our AM 15/05/28 21:55:24 INFO Client: Preparing resources for our AM container 15/05/28 21:55:24 INFO Client: Setting up the launch environment for our AM container 15/05/28 21:55:24 INFO SecurityManager: Changing view acls to: xyz 15/05/28 21:55:24 INFO SecurityManager: Changing modify acls to: xyz 15/05/28 21:55:24 INFO SecurityManager: SecurityManager:
Adding an indexed column
Assuming that I have the next data frame: flag | price -- 1|47.808764653746 1|47.808764653746 1|31.9869279512204 1|47.7907893713564 1|16.7599200038239 1|16.7599200038239 1|20.3916014172137 How can I create a data frame with an extra indexed column as the next one: flag | price | index --|--- 1|47.808764653746 | 0 1|47.808764653746 | 1 1|31.9869279512204| 2 1|47.7907893713564| 3 1|16.7599200038239| 4 1|16.7599200038239| 5 1|20.3916014172137| 6 -- Cesar Flores
Re: Value for SPARK_EXECUTOR_CORES
Thanks for the valuable information. The blog states: The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. So, I guess the max number of executor-cores I can assign is the CPU count (which includes the number of threads per core), not just the number of cores. I just want to be sure the cores term Spark is using. Thanks On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com wrote: It's not only about cores. Keep in mind spark.executor.cores also affects available memeory for each task: From http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/ spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively. I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run faster.. -- Ruslan Dautkhanov On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: My executor has the following spec (lscpu): CPU(s): 16 Core(s) per socket: 4 Socket(s): 2 Thread(s) per code: 2 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 2 = 4) ? Thanks
Fwd: [Streaming] Configure executor logging on Mesos
-- Forwarded message -- From: Tim Chen t...@mesosphere.io Date: Thu, May 28, 2015 at 10:49 AM Subject: Re: [Streaming] Configure executor logging on Mesos To: Gerard Maas gerard.m...@gmail.com Hi Gerard, The log line you referred to is not Spark logging but Mesos own logging, which is using glog. Our own executor logs should only contain very few lines though. Most of the log lines you'll see is from Spark, and it can be controled by specifiying a log4j.properties to be downloaded with your Mesos task. Alternatively if you are downloading Spark executor via spark.executor.uri, you can include log4j.properties in that tar ball. I think we probably need some more configurations for Spark scheduler to pick up extra files to be downloaded into the sandbox. Tim On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I'm trying to control the verbosity of the logs on the Mesos executors with no luck so far. The default behaviour is INFO on stderr dump with an unbounded growth that gets too big at some point. I noticed that when the executor is instantiated, it locates a default log configuration in the spark assembly: I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave 20150528-063307-780930314-5050-8152-S5 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties So, no matter what I provide in my job jar files (or also tried with (spark.executor.extraClassPath=log4j.properties) takes effect in the executor's configuration. How should I configure the log on the executors? thanks, Gerard.
Hyperthreading
Hi guys, Does the SPARK_EXECUTOR_CORES assume Hyper threading? For example, if I have 4 cores with 2 threads per core, should the SPARK_EXECUTOR_CORES be 4*2 = 8 or just 4? Thanks,
RE: Value for SPARK_EXECUTOR_CORES
I don’t think the number of CPU cores controls the “number of parallel tasks”. The number of Tasks corresponds first and foremost to the number of (Dstream) RDD Partitions The Spark documentation doesn’t mention what is meant by “Task” in terms of Standard Multithreading Terminology ie a Thread or Process so your point is good Ps: time and time again every product and dev team and company invent their own terminology so 50% of the time using the product is spent on deciphering and reinventing the wheel From: Mulugeta Mammo [mailto:mulugeta.abe...@gmail.com] Sent: Thursday, May 28, 2015 7:24 PM To: Ruslan Dautkhanov Cc: user Subject: Re: Value for SPARK_EXECUTOR_CORES Thanks for the valuable information. The blog states: The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. So, I guess the max number of executor-cores I can assign is the CPU count (which includes the number of threads per core), not just the number of cores. I just want to be sure the cores term Spark is using. Thanks On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com wrote: It's not only about cores. Keep in mind spark.executor.cores also affects available memeory for each task: From http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively. I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run faster.. -- Ruslan Dautkhanov On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: My executor has the following spec (lscpu): CPU(s): 16 Core(s) per socket: 4 Socket(s): 2 Thread(s) per code: 2 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 2 = 4) ? Thanks
yarn-cluster spark-submit process not dying
I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm noticing the jvm that fires up to allocate the resources, etc... is not going away after the application master and executors have been allocated. Instead, it just sits there printing 1 second status updates to the console. If I kill it, my job still runs (as expected). Is there an intended way to stop this from happening and just have the local JVM die when it's done allocating the resources and deploying the application master?
Re: yarn-cluster spark-submit process not dying
Hi Corey, As of this PR https://github.com/apache/spark/pull/5297/files, this can be controlled with spark.yarn.submit.waitAppCompletion. -Sandy On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote: I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm noticing the jvm that fires up to allocate the resources, etc... is not going away after the application master and executors have been allocated. Instead, it just sits there printing 1 second status updates to the console. If I kill it, my job still runs (as expected). Is there an intended way to stop this from happening and just have the local JVM die when it's done allocating the resources and deploying the application master?
Re: PySpark with OpenCV causes python worker to crash
Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Value for SPARK_EXECUTOR_CORES
It's not only about cores. Keep in mind spark.executor.cores also affects available memeory for each task: From http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/ spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively. I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run faster.. -- Ruslan Dautkhanov On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: My executor has the following spec (lscpu): CPU(s): 16 Core(s) per socket: 4 Socket(s): 2 Thread(s) per code: 2 The CPU count is obviously 4*2*2 = 16. My question is what value is Spark expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 2 = 4) ? Thanks
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com: Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Thursday, May 28, 2015 2:34 PM *To:* Evo Eftimov *Cc:* Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka
Batch aggregation by sliding window + join
Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task joins same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same partitioner(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark1.3.1 build issue with CDH5.4.0 getUnknownFields
Hi , I'm using CDH5.4.0 quick start VM and tried to build Spark with Hive compatibility so that I can run Spark sql and access temp table remotely. I used below command to build Spark, it was build successful but when I tried to access Hive data from Spark sql, I get error. Thanks, Abhi --- *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive -Phive-thriftserver -DskipTests clean package* [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/ [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql SET spark.sql.hive.version=0.13.1 spark-sql show tables; sample_07 false t1 false Time taken: 3.901 seconds, Fetched 2 row(s) spark-sql select * from t1; 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1] java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto overrides final method *getUnknownFields* .()Lcom/google/protobuf/UnknownFieldSet; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
Twitter Streaming HTTP 401 Error
I am working on the Databricks Reference applications, porting them to my company's platform, and extending them to emit RDF. I have already gotten them working with the extension on EC2, and have the Log Analyzer application working on our platform. But the Twitter Language Classifier application keeps getting an HTTP 401 Error. This error shows the following message: 15/05/28 16:30:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error receiving tweets - 401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync. html\nhead\nmeta http-equiv=Content-Type content=text/html; charset=utf-8/\ntitleError 401 Unauthorized/title /head body HTTP ERROR: 401 pProblem accessing '/1.1/statuses/sample.json?stall_warnings=true'. Reason: preUnauthorized/pre /body /html Relevant discussions can be found on the Internet at: http://www.google.co.jp/search?q=d0031b0b or http://www.google.co.jp/search?q=1db75513 TwitterException{exceptionCode=[d0031b0b-1db75513], statusCode=401, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=3.0.3} at twitter4j.internal.http.HttpClientImpl.request(HttpClientImpl.java:177) at twitter4j.internal.http.HttpClientWrapper.request(HttpClientWrapper.java:61) at twitter4j.internal.http.HttpClientWrapper.get(HttpClientWrapper.java:89) at twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:176) at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:164) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:462) 15/05/28 16:30:13 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka.tcp://sparkExecutor@mesos-0020-n3:56412 As the message says, the 401 code can be thrown by either bad credentials, or by unsynchronized clocks. However, I know my credentials work, as I've tested them both on EC2, and directly on the OAuth Testing Tool on the Twitter Application Manager. Likewise, all the nodes on our cluster are running ntpd, so that shouldn't be the problem either. I did some looking at the API for the codes, and saw that the 401 could also be thrown for any calls to the now deprecated v1 API endpoints, but given that the Databricks applications run correctly in other contexts, such as EC2, I also do not think this is the problem. I am at a loss to know why I am getting this error, and have run out of ideas and could use some help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Streaming-HTTP-401-Error-tp23080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use Eclipse on Windows to build Spark environment?
Hi Somnath, Is there a step-by-step instruction about using Eclipse to develop Spark application? I think many people need them. Thanks! Best Regards Nan Xiao On Thu, May 28, 2015 at 3:15 PM, Somnath Pandeya somnath_pand...@infosys.com wrote: Try scala eclipse plugin to eclipsify spark project and import spark as eclipse project -Somnath -Original Message- From: Nan Xiao [mailto:xiaonan830...@gmail.com] Sent: Thursday, May 28, 2015 12:32 PM To: user@spark.apache.org Subject: How to use Eclipse on Windows to build Spark environment? Hi all, I want to use Eclipse on Windows to build Spark environment, but find the reference page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup) doesn't contain any guide about Eclipse. Could anyone give tutorials or links about how to using Eclipse on Windows to build Spark environment? Thanks in advance! Best Regards Nan Xiao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS*** - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL v MemSQL/Voltdb
I have used VoltDB and Spark. The use cases for the two are quite different. VoltDB is intended for transactions and also supports queries on the same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it is designed for querying immutable data (which may exist in several different forms of stores). On May 28, 2015, at 7:48 AM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: Hello, I was wondering if there is any documented comparison of SparkSQL with MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries to be run in a clustered environment. What is the major differentiation? Regards, Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Registering Custom metrics [Spark-Streaming-monitoring]
Hello All, I am using spark streaming 1.3 . I want to capture few custom metrics based on accumulators, I followed somewhat similar to this approach , val instrumentation = new SparkInstrumentation(example.metrics) * val numReqs = sc.accumulator(0L) * instrumentation.source.registerDailyAccumulator(numReqs, numReqs) * instrumentation.register() https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23 After registering metrics via accumulator , I am not able to see values of these metrics on sink. I tried console sink , but still no luck Do I need to set any properties in metrics.conf to enable this custom source? Also I noticed accumulators are displayed only in the task stages , It is difficult to identify which task/stage on ui , that information would be available . Is there a way accumulators can be displayed on overall Job stats page ? Any pointers/examples to achieve this would be helpful , spark monitoring documentation is not very helpful, Thanks in advance, - Snehal
Re: Spark SQL v MemSQL/Voltdb
Hi Mohit, Thanks for your reply. If my use case is purely querying read-only data (no transaction scenarios), at what scale is one of them a better option than the other? I am aware that for scale which can be supported on a single node, VoltDB is a better choice. However, when the scale grows to a clustered scenario, which is the right engine at various degrees of scale? Regards, Ashish On Fri, May 29, 2015 at 6:57 AM, Mohit Jaggi mohitja...@gmail.com wrote: I have used VoltDB and Spark. The use cases for the two are quite different. VoltDB is intended for transactions and also supports queries on the same(custom to voltdb) store. Spark(SQL) is NOT suitable for transactions; it is designed for querying immutable data (which may exist in several different forms of stores). On May 28, 2015, at 7:48 AM, Ashish Mukherjee ashish.mukher...@gmail.com wrote: Hello, I was wondering if there is any documented comparison of SparkSQL with MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries to be run in a clustered environment. What is the major differentiation? Regards, Ashish
Spark Cassandra
Hello, I am trying to save data from spark to Cassandra. So I have an ScalaESRDD (because i take data from elasticsearch) that contains a lot of key/values like this : (AU16r4o_kbhIuSky3zFO , Map(@timestamp - 2015-05-21T21:35:54.035Z, timestamp - 2015-05-21 23:35:54,035, loglevel - INFO, thread - ajp-crmprod-fr-003%2F10.2.53.39-8009-178, ID_Echange - 1432244153901, SessionID - -, ProcessID - 1432244149a998000f024390, IP - 37.163.216.219, proxy - 37.163.216.219, ContactID - 2255701356, Login - 46711205, messageType - )) (AU16r4o_kbhIuSky3zFT , Map(@timestamp - 2015-05-21T21:35:55.450Z, timestamp - 2015-05-21 23:35:55,450, loglevel - INFO, thread - ajp-crmprod-fr-003%2F10.2.53.39-8009-168, ID_Echange - 1432244155381, SessionID - 76c8ad0dc9c86840cf1ff078911a6040, ProcessID - 1432244154a435000f024680, IP - 82.240.167.71, proxy - 82.240.167.71, ContactID - 2816846047, Login - 14642027, messageType - )) So there is a key and a map as value. What I want to do is to put each Key/Value on a cassandra table. I used saveToCassandra() but it is not exactly what I wish because after this I have a table with the key and the map but it is not possible to do queries on a map in Cassandra (so it is useless). So I want to put data in order to be able to query the map. I thought about create a user type but how can I save the map in a user type for each Key/Value ? I hope it is clear, Thank you very much for you help. Best Regards, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-tp23065.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to use Eclipse on Windows to build Spark environment?
Hi all, I want to use Eclipse on Windows to build Spark environment, but find the reference page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup) doesn't contain any guide about Eclipse. Could anyone give tutorials or links about how to using Eclipse on Windows to build Spark environment? Thanks in advance! Best Regards Nan Xiao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to use Eclipse on Windows to build Spark environment?
Try scala eclipse plugin to eclipsify spark project and import spark as eclipse project -Somnath -Original Message- From: Nan Xiao [mailto:xiaonan830...@gmail.com] Sent: Thursday, May 28, 2015 12:32 PM To: user@spark.apache.org Subject: How to use Eclipse on Windows to build Spark environment? Hi all, I want to use Eclipse on Windows to build Spark environment, but find the reference page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup) doesn't contain any guide about Eclipse. Could anyone give tutorials or links about how to using Eclipse on Windows to build Spark environment? Thanks in advance! Best Regards Nan Xiao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS*** - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding slaves on spark standalone on ec2
I do this way: - Launch a new instance by clicking on the slave instance and choose *launch more like this * *- *Once its launched, ssh into it and add the master public key to .ssh/authorized_keys - Add the slaves internal IP to the master's conf/slaves file - do sbin/start-all.sh and it will show up along with other slaves. Thanks Best Regards On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote: hi, I'm working on spark standalone system on ec2, and I'm having problems on resizing the cluster (meaning - adding or removing slaves). In the basic ec2 scripts (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script for lunching the cluster, not adding slaves to it. On the spark-standalone page ( http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts ), I can see only options for stopping and starting slaves, not adding them. What I try to do now (as a bad workaround...), is the following: 1) Go to the ec2 UI, create image from the current slave 2) Lunch new instance based on this image 3) Copy the public DNS of this slave 4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh, and add the DNS to the export SLAVES variables 5) Running the script /root/spark-ec2/setup.sh After doing the above steps, I can see the new slave in the UI (8080 port) of the master. However, this solution is bad for many reasons: 1) It requires many manual steps 2) It requires stopping and starting the cluster 3) There's no auto-detection in case slave stopped and many other reasons... oes anybody have another idea on how to add/remove slaves for standalone on a simple and safe way? thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Adding slaves on spark standalone on ec2
hi, I'm working on spark standalone system on ec2, and I'm having problems on resizing the cluster (meaning - adding or removing slaves). In the basic ec2 scripts (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script for lunching the cluster, not adding slaves to it. On the spark-standalone page (http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts), I can see only options for stopping and starting slaves, not adding them. What I try to do now (as a bad workaround...), is the following: 1) Go to the ec2 UI, create image from the current slave 2) Lunch new instance based on this image 3) Copy the public DNS of this slave 4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh, and add the DNS to the export SLAVES variables 5) Running the script /root/spark-ec2/setup.sh After doing the above steps, I can see the new slave in the UI (8080 port) of the master. However, this solution is bad for many reasons: 1) It requires many manual steps 2) It requires stopping and starting the cluster 3) There's no auto-detection in case slave stopped and many other reasons... oes anybody have another idea on how to add/remove slaves for standalone on a simple and safe way? thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry
Re: Recommended Scala version
Would be great if you guys can test out the Spark 1.4.0 RC2 (RC3 coming out soon) with Scala 2.11 and report issues. TD On Tue, May 26, 2015 at 9:15 AM, Koert Kuipers ko...@tresata.com wrote: we are still running into issues with spark-shell not working on 2.11, but we are running on somewhat older master so maybe that has been resolved already. On Tue, May 26, 2015 at 11:48 AM, Dean Wampler deanwamp...@gmail.com wrote: Most of the 2.11 issues are being resolved in Spark 1.4. For a while, the Spark project has published maven artifacts that are compiled with 2.11 and 2.10, although the downloads at http://spark.apache.org/downloads.html are still all for 2.10. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 26, 2015 at 10:33 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Yes, recommended version is 2.10 as all the features are not supported by 2.11 version. Kafka libraries and JDBC components are yet to be ported to 2.11 version. And so if your project doesn't depend on these components, you can give v2.11 a try. Here's a link https://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211 for building with 2.11 version. Though, you won't be running into any issues if you try v2.10 as of now. But then again, the future releases will have to shift to 2.11 version once support for v2.10 ends in the long run. On Tue, May 26, 2015 at 8:21 PM, Punyashloka Biswal punya.bis...@gmail.com wrote: Dear Spark developers and users, Am I correct in believing that the recommended version of Scala to use with Spark is currently 2.10? Is there any plan to switch to 2.11 in future? Are there any advantages to using 2.11 today? Regards, Punya
DataFrame nested sctructure selection limit
Hi! I have a json file with some data, I’m able to create DataFrame out of it and the schema for particular part of it I’m interested in looks like following: val json: DataFrame = sqlc.load(entities_with_address2.json, json) root |-- attributes: struct (nullable = true) ||-- Address2: array (nullable = true) |||-- value: struct (nullable = true) ||||-- Zip: array (nullable = true) |||||-- element: struct (containsNull = true) ||||||-- value: struct (nullable = true) |||||||-- Zip5: array (nullable = true) ||||||||-- element: struct (containsNull = true) |||||||||-- value: string (nullable = true) When I’m trying to just select the deepest field: json.select(attributes.Address2.value.Zip.value.Zip5).collect() It gives me a really weird exception. So I wonder if there is a particular limit of how deeply I’m able to select fields. What should I do to select the piece I need? Here is the exception. org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true); at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44) at
How to use Eclipse on Windows to build Spark environment?
Hi all, I want to use Eclipse on Windows to build Spark environment, but find the reference page(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-IDESetup) doesn't contain any guide about Eclipse. Could anyone give tutorials or links about how to using Eclipse on Windows to build Spark environment? Thanks in advance! Best Regards Nan Xiao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry -- Jerry
Re: Adding slaves on spark standalone on ec2
hi, thanks for your answer! I have few more: 1) the file /root/spark/conf/slaves , has the full DNS names of servers ( ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the internal ip? 2) You call to start-all. Isn't it too aggressive? Let's say I have 20 slaves up, and I want to add one more, why should we stop the entire cluster for this? thanks, nizan On Thu, May 28, 2015 at 10:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I do this way: - Launch a new instance by clicking on the slave instance and choose *launch more like this * *- *Once its launched, ssh into it and add the master public key to .ssh/authorized_keys - Add the slaves internal IP to the master's conf/slaves file - do sbin/start-all.sh and it will show up along with other slaves. Thanks Best Regards On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote: hi, I'm working on spark standalone system on ec2, and I'm having problems on resizing the cluster (meaning - adding or removing slaves). In the basic ec2 scripts (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script for lunching the cluster, not adding slaves to it. On the spark-standalone page ( http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts ), I can see only options for stopping and starting slaves, not adding them. What I try to do now (as a bad workaround...), is the following: 1) Go to the ec2 UI, create image from the current slave 2) Lunch new instance based on this image 3) Copy the public DNS of this slave 4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh, and add the DNS to the export SLAVES variables 5) Running the script /root/spark-ec2/setup.sh After doing the above steps, I can see the new slave in the UI (8080 port) of the master. However, this solution is bad for many reasons: 1) It requires many manual steps 2) It requires stopping and starting the cluster 3) There's no auto-detection in case slave stopped and many other reasons... oes anybody have another idea on how to add/remove slaves for standalone on a simple and safe way? thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding slaves on spark standalone on ec2
1. Upto you, you can either add internal ip or the external ip, it won't be a problem unless they are not in the same network. 2. If you only want to start a particular slave, then you can do like: sbin/start-slave.sh worker# master-spark-URL Thanks Best Regards On Thu, May 28, 2015 at 1:52 PM, Nizan Grauer ni...@windward.eu wrote: hi, thanks for your answer! I have few more: 1) the file /root/spark/conf/slaves , has the full DNS names of servers ( ec2-52-26-7-137.us-west-2.compute.amazonaws.com), did you add there the internal ip? 2) You call to start-all. Isn't it too aggressive? Let's say I have 20 slaves up, and I want to add one more, why should we stop the entire cluster for this? thanks, nizan On Thu, May 28, 2015 at 10:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I do this way: - Launch a new instance by clicking on the slave instance and choose *launch more like this * *- *Once its launched, ssh into it and add the master public key to .ssh/authorized_keys - Add the slaves internal IP to the master's conf/slaves file - do sbin/start-all.sh and it will show up along with other slaves. Thanks Best Regards On Thu, May 28, 2015 at 12:29 PM, nizang ni...@windward.eu wrote: hi, I'm working on spark standalone system on ec2, and I'm having problems on resizing the cluster (meaning - adding or removing slaves). In the basic ec2 scripts (http://spark.apache.org/docs/latest/ec2-scripts.html), there's only script for lunching the cluster, not adding slaves to it. On the spark-standalone page ( http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts ), I can see only options for stopping and starting slaves, not adding them. What I try to do now (as a bad workaround...), is the following: 1) Go to the ec2 UI, create image from the current slave 2) Lunch new instance based on this image 3) Copy the public DNS of this slave 4) SSH to the master, and edit the file /root/spark-ec2/ec2-variables.sh, and add the DNS to the export SLAVES variables 5) Running the script /root/spark-ec2/setup.sh After doing the above steps, I can see the new slave in the UI (8080 port) of the master. However, this solution is bad for many reasons: 1) It requires many manual steps 2) It requires stopping and starting the cluster 3) There's no auto-detection in case slave stopped and many other reasons... oes anybody have another idea on how to add/remove slaves for standalone on a simple and safe way? thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-slaves-on-spark-standalone-on-ec2-tp23064.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Get all servers in security group in bash(ec2)
hi, Is there anyway in bash (from an ec2 apsrk server) to list all the servers in my security group (or better - in a given security group) I tried using: wget -q -O - http://instance-data/latest/meta-data/security-groups security_group_xxx but now, I want all the servers in security group security_group_xxx thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Get all servers in security group in bash(ec2)
You can use python boto library for that, in fact spark-ec2 script uses it underneath. Here's the https://github.com/apache/spark/blob/master/ec2/spark_ec2.py#L706 call spark-ec2 is making to get all machines under a given security group. Thanks Best Regards On Thu, May 28, 2015 at 2:22 PM, nizang ni...@windward.eu wrote: hi, Is there anyway in bash (from an ec2 apsrk server) to list all the servers in my security group (or better - in a given security group) I tried using: wget -q -O - http://instance-data/latest/meta-data/security-groups security_group_xxx but now, I want all the servers in security group security_group_xxx thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Get-all-servers-in-security-group-in-bash-ec2-tp23066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. *Longer version.* I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers is fixed for the lifetime of your DStream. What we do is to calculate the cluster capacity and use that as a fixed upper bound (with a margin) for the receiver throughput. There's work in progress to add a reactive model to the receiver, where backpressure can be applied to handle overload conditions. See https://issues.apache.org/jira/browse/SPARK-7398 Once the data is received, it will be processed in a 'classical' Spark pipeline, so previous posts on spark resource scheduling might apply. Regarding metrics, the standard metrics subsystem of spark will report streaming job performance. Check the driver's metrics endpoint to peruse the available metrics: driver:ui-port/metrics/json -kr, Gerard. (*) Spark is a project that moves so fast that statements might be invalidated by new work every minute. On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm trying to understand if there are design patterns for autoscaling Spark (add/remove slave machines to the cluster) based on the throughput. Assuming we can throttle Spark consumers, the respective Kafka topics we stream data from would start growing. What are some of the ways to generate the metrics on the number of new messages and the rate they are piling up? This perhaps is more of a Kafka question; I see a pretty sparse javadoc with the Metric interface and not much else... What are some of the ways to expand/contract the Spark cluster? Someone has mentioned Mesos... I see some info on Spark metrics in the Spark monitoring guide https://spark.apache.org/docs/latest/monitoring.html . Do we want to perhaps implement a custom sink that would help us autoscale up or down based on the throughput? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry -- Jerry
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Thank you, Gerard. We're looking at the receiver-less setup with Kafka Spark streaming so I'm not sure how to apply your comments to that case (not that we have to use receiver-less but it seems to offer some advantages over the receiver-based). As far as the number of Kafka receivers is fixed for the lifetime of your DStream -- this may be OK to start with. What I'm researching is the ability to add worker nodes to the Spark cluster when needed and remove them when no longer needed. Do I understand correctly that a single receiver may cause work to be farmed out to multiple 'slave' machines/worker nodes? If that's the case, we're less concerned with multiple receivers; we're concerned with the worker node cluster itself. If we use the ConsumerOffsetChecker class in Kafka that Rajesh mentioned and instrument dynamic adding/removal of machines, my subsequent questions then are, a) will Spark sense the addition of a new node / is it sufficient that the cluster manager is aware, then work just starts flowing there? and b) what would be a way to gracefully remove a worker node when the load subsides, so that no currently running Spark job is killed? - Dmitry On Thu, May 28, 2015 at 7:36 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. *Longer version.* I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers is fixed for the lifetime of your DStream. What we do is to calculate the cluster capacity and use that as a fixed upper bound (with a margin) for the receiver throughput. There's work in progress to add a reactive model to the receiver, where backpressure can be applied to handle overload conditions. See https://issues.apache.org/jira/browse/SPARK-7398 Once the data is received, it will be processed in a 'classical' Spark pipeline, so previous posts on spark resource scheduling might apply. Regarding metrics, the standard metrics subsystem of spark will report streaming job performance. Check the driver's metrics endpoint to peruse the available metrics: driver:ui-port/metrics/json -kr, Gerard. (*) Spark is a project that moves so fast that statements might be invalidated by new work every minute. On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm trying to understand if there are design patterns for autoscaling Spark (add/remove slave machines to the cluster) based on the throughput. Assuming we can throttle Spark consumers, the respective Kafka topics we stream data from would start growing. What are some of the ways to generate the metrics on the number of new messages and the rate they are piling up? This perhaps is more of a Kafka question; I see a pretty sparse javadoc with the Metric interface and not much else... What are some of the ways to expand/contract the Spark cluster? Someone has mentioned Mesos... I see some info on Spark metrics in the Spark monitoring guide https://spark.apache.org/docs/latest/monitoring.html . Do we want to perhaps implement a custom sink that would help us autoscale up or down based on the throughput? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
@DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor etc. Obviously this will cause some temporary delay in fact interruption in your processing but if the business use case can tolerate that then go for it From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, May 28, 2015 12:36 PM To: dgoldenberg Cc: spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. Longer version. I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers is fixed for the lifetime of your DStream. What we do is to calculate the cluster capacity and use that as a fixed upper bound (with a margin) for the receiver throughput. There's work in progress to add a reactive model to the receiver, where backpressure can be applied to handle overload conditions. See https://issues.apache.org/jira/browse/SPARK-7398 Once the data is received, it will be processed in a 'classical' Spark pipeline, so previous posts on spark resource scheduling might apply. Regarding metrics, the standard metrics subsystem of spark will report streaming job performance. Check the driver's metrics endpoint to peruse the available metrics: driver:ui-port/metrics/json -kr, Gerard. (*) Spark is a project that moves so fast that statements might be invalidated by new work every minute. On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm trying to understand if there are design patterns for autoscaling Spark (add/remove slave machines to the cluster) based on the throughput. Assuming we can throttle Spark consumers, the respective Kafka topics we stream data from would start growing. What are some of the ways to generate the metrics on the number of new messages and the rate they are piling up? This perhaps is more of a Kafka question; I see a pretty sparse javadoc with the Metric interface and not much else... What are some of the ways to expand/contract the Spark cluster? Someone has mentioned Mesos... I see some info on Spark metrics in the Spark monitoring guide https://spark.apache.org/docs/latest/monitoring.html . Do we want to perhaps implement a custom sink that would help us autoscale up or down based on the throughput? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: why does com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException happen?
begs for your help -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-com-esotericsoftware-kryo-KryoException-java-u-til-ConcurrentModificationException-happen-tp23067p23068.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
why does com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException happen?
My program runs for 500 iterations, but fails at about 150 iterations almostly. It's hard to explain the details of my program, but i think my program is ok, for it runs succesfully somtimes. *I just wana know in which situations this exception will happen*. The detail error information is as follows: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 43644.0 failed 4 time s, most recent failure: Lost task 32.3 in stage 43644.0 (TID 72976, 10.194.117.197): com.esotericsoftware.kryo.KryoException: java.u til.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) cachedPDs (javax.security.auth.SubjectDomainCombiner) combiner (java.security.AccessControlContext) acc (org.apache.spark.executor.ExecutorURLClassLoader) classLoader (com.twitter.chill.KryoBase) kryo (org.apache.spark.serializer.KryoDeserializationStream) $outer (org.apache.spark.serializer.DeserializationStream$$anon$1) delegate (org.apache.spark.InterruptibleIterator) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) ... at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:130) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:370) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.ConcurrentModificationException at java.util.Vector$Itr.checkForComodification(Vector.java:1156) at java.util.Vector$Itr.next(Vector.java:1133) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) ... 44 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler .scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor etc. Obviously this will cause some temporary delay in fact interruption in your processing but if the business use case can tolerate that then go for it *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Thursday, May 28, 2015 12:36 PM *To:* dgoldenberg *Cc:* spark users *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. *Longer version.* I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers is fixed for the lifetime of your DStream. What we do is to calculate the cluster capacity and use that as a fixed upper bound (with a margin) for the receiver throughput. There's work in progress to add a reactive model to the receiver, where backpressure can be applied to handle overload conditions. See https://issues.apache.org/jira/browse/SPARK-7398 Once the data is received, it will be processed in a 'classical' Spark pipeline, so previous posts on spark resource scheduling might apply. Regarding metrics, the standard metrics subsystem of spark will report streaming job performance. Check the driver's metrics endpoint to peruse the available metrics: driver:ui-port/metrics/json -kr, Gerard. (*) Spark is a project that moves so fast that statements might be
SPARK STREAMING PROBLEM
Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile div Original message /divdivFrom: Dmitry Goldenberg dgoldenberg...@gmail.com /divdivDate:2015/05/28 13:16 (GMT+00:00) /divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: Gerard Maas gerard.m...@gmail.com,spark users user@spark.apache.org /divdivSubject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? /divdiv /divThanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently and benefit from ongoing online learning – BUT this is STILL about voluntarily sacrificing your performance in the name of keeping your system stable – it is not about scaling your system/solution In terms of how to scale the Spark Framework Dynamically – even though this is not supported at the moment out of the box I guess you can have a sys management framework spin dynamically a few more boxes (spark worker nodes), stop dynamically your currently running Spark Streaming Job, relaunch it with new params e.g. more Receivers, larger number of Partitions (hence tasks), more RAM per executor etc. Obviously this will cause some temporary delay in fact interruption in your processing but if the business use case can tolerate that then go for it From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, May 28, 2015 12:36 PM To: dgoldenberg Cc: spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Hi, tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark streaming processes is not supported. Longer version. I assume that you are talking about Spark Streaming as the discussion is about handing Kafka streaming data. Then you have two things to consider: the Streaming receivers and the Spark processing cluster. Currently, the receiving topology is static. One receiver is allocated with each DStream instantiated and it will use 1 core in the cluster. Once the StreamingContext is started, this topology cannot be changed, therefore the number of Kafka receivers
Re: SPARK STREAMING PROBLEM
You must start the StreamingContext by calling ssc.start() On Thu, May 28, 2015 at 6:57 PM, Animesh Baranawal animeshbarana...@gmail.com wrote: Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Dataframe Partitioning
Hi. I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner join between these dataframes, the result contains 200 partitions. *Why?* df1.join(df2, df1(id) === df2(id), Inner) = returns 200 partitions Thanks!!! -- Regards. Miguel Ángel
Fwd: SPARK STREAMING PROBLEM
I also started the streaming context by running ssc.start() but still apart from logs nothing of g gets printed. -- Forwarded message -- From: Animesh Baranawal animeshbarana...@gmail.com Date: Thu, May 28, 2015 at 6:57 PM Subject: SPARK STREAMING PROBLEM To: user@spark.apache.org Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh
Re: Pointing SparkSQL to existing Hive Metadata with data file locations in HDFS
val sqlContext = new HiveContext(sc) val schemaRdd = sqlContext.sql(some complex SQL) It mostly works, but have been having issues with tables that contains a large amount of data: https://issues.apache.org/jira/browse/SPARK-6910 https://issues.apache.org/jira/browse/SPARK-6910 On May 27, 2015, at 20:52, Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID wrote: hey guys On the Hive/Hadoop ecosystem we have using Cloudera distribution CDH 5.2.x , there are about 300+ hive tables. The data is stored an text (moving slowly to Parquet) on HDFS. I want to use SparkSQL and point to the Hive metadata and be able to define JOINS etc using a programming structure like this import org.apache.spark.sql.hive.HiveContext val sqlContext = new HiveContext(sc) val schemaRdd = sqlContext.sql(some complex SQL) Is that the way to go ? Some guidance will be great. thanks sanjay
Soft distinct on data frames.
Hey, Is there a way to do a distinct operation on each partition only? My program generates quite a few duplicate tuples and it would be nice to remove some of these as an optimisation without having to reshuffle the data. I’ve also noticed that plans generated with an unique transformation have this peculiar form: == Physical Plan == Distinct false Exchange (HashPartitioning [_0#347L,_1#348L], 200) Distinct true PhysicalRDD [_0#347L,_1#348L], MapPartitionsRDD[247] at map at SQLContext.scala:394 Does this mean that set semantics are just a flag that can be turned off and on for each shuffling operation? If so, is it possible to do so in general, so that one always uses set semantics instead of bag? Or will the optimiser try to propagate the set semantics? Cheers Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK STREAMING PROBLEM
The oproblem lies the way you are doing the processing. After the g.foreach(x = {println(x); println()}) are you doing ssc.start. It means till now what you did is just setup the computation stpes but spark has not started any real processing. so when you do g.foreach what it iterates over is the empyt list yopu are returning from the g method hence it does not print anything On Thu, May 28, 2015 at 7:02 PM, Animesh Baranawal animeshbarana...@gmail.com wrote: I also started the streaming context by running ssc.start() but still apart from logs nothing of g gets printed. -- Forwarded message -- From: Animesh Baranawal animeshbarana...@gmail.com Date: Thu, May 28, 2015 at 6:57 PM Subject: SPARK STREAMING PROBLEM To: user@spark.apache.org Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
[Streaming] Configure executor logging on Mesos
Hi, I'm trying to control the verbosity of the logs on the Mesos executors with no luck so far. The default behaviour is INFO on stderr dump with an unbounded growth that gets too big at some point. I noticed that when the executor is instantiated, it locates a default log configuration in the spark assembly: I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave 20150528-063307-780930314-5050-8152-S5 Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties So, no matter what I provide in my job jar files (or also tried with (spark.executor.extraClassPath=log4j.properties) takes effect in the executor's configuration. How should I configure the log on the executors? thanks, Gerard.
Re: Dataframe Partitioning
That’s due to the config setting spark.sql.shuffle.partitions which defaults to 200 From: Masf Date: Thursday, May 28, 2015 at 10:02 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Dataframe Partitioning Hi. I have 2 dataframe with 1 and 12 partitions respectively. When I do a inner join between these dataframes, the result contains 200 partitions. Why? df1.join(df2, df1(id) === df2(id), Inner) = returns 200 partitions Thanks!!! -- Regards. Miguel Ángel
Spark streaming with kafka
Hi guys, I using spark streaming with kafka... In local machine (start as java application without using spark-submit) it's work, connect to kafka and do the job (*). I tried to put into spark docker container (hadoop 2.6, spark 1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of success... No error on the console (the application started), I see something received from kafka but the result is not written out to elasticsearch... Where can I start the debug? I see in the spark console two job, both 0/1... Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically issue “get/check free memory” to see whether it is decreasing relentlessly at a constant rate – if it touches a predetermined RAM threshold that should be your third metric Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you can implement one on your own without waiting for Jiras and new features whenever they might be implemented by the Spark dev team – moreover you can avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine Learning in your Feedback Loop to make it handle the message consumption rate more intelligently
Spark SQL v MemSQL/Voltdb
Hello, I was wondering if there is any documented comparison of SparkSQL with MemSQL/VoltDB kind of in-memory SQL databases. MemSQL etc. too allow queries to be run in a clustered environment. What is the major differentiation? Regards, Ashish
Best practice to update a MongoDB document from Sparks
Hello, I'm evaluating Spark/SparkStreaming . I use SparkStreaming to receive messages from a Kafka topic. As soon as I have a JavaReceiverInputDStream , I have to treat each message, for each one I have to search in MongoDB to find if a document does exist. If I found the document I have to update it with some values , else I have to create a new one. I success to create a new document by using saveAsNewAPIHadoopFiles , but I don't know the best practice to update an existing document from Spark. In a basic Java program I would use : myColl.update(updateQuery, updateCommand); So, do I have to use this same way to update the document, skip the message and let saveAsNewAPIHadoopFiles in order to create the ones to be created ? Or any other better way ? Tks Nicolas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Thursday, May 28, 2015 2:34 PM To: Evo Eftimov Cc: Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry -- Jerry -- Jerry
Re: debug jsonRDD problem?
On Wed, May 27, 2015 at 02:06:16PM -0700, Ted Yu wrote: Looks like the exception was caused by resolved.get(prefix ++ a) returning None : a = StructField(a.head, resolved.get(prefix ++ a).get, nullable = true) There are three occurrences of resolved.get() in createSchema() - None should be better handled in these places. My two cents. Here's the simplest test case I've come up with: sqlContext.jsonRDD(sc.parallelize(Array({\'```'\:\\}))).count() Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark with OpenCV causes python worker to crash
Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
Loading CSV to DataFrame and saving it into Parquet for speedup
I am using Spark-CSV to load a 50GB of around 10,000 CSV files into couple of unified DataFrames. Since this process is slow I have wrote this snippet: targetList.foreach { target = // this is using sqlContext.load by getting list of files then loading them according to schema files that // read before and built their StructType getTrace(target, sqlContext) .reduce(_ unionAll _) .registerTempTable(target.toUpperCase()) sqlContext.sql(SELECT * FROM + target.toUpperCase()) .saveAsParquetFile(processedTraces + target) to load the csv files and then union all the cvs files with the same schema and write them into a single parquet file with their parts. The problems is my cpu (not all cpus are being busy) and disk (ssd, with 1MB/s at most) are barely utilized. I wonder what am I doing wrong?! snippet for getTrace: def getTrace(target: String, sqlContext: SQLContext): Seq[DataFrame] = { logFiles(mainLogFolder + target).map { file = sqlContext.load( driver, // schemaSelect builds the StructType once schemaSelect(schemaFile, target, sqlContext), Map(path - file, header - false, delimiter - ,)) } } thanks for any help - regards, mohamad -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-CSV-to-DataFrame-and-saving-it-into-Parquet-for-speedup-tp23071.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org