Problem reading Parquet from 1.2 to 1.3
As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that Spark is behaving differently when reading Parquet directories that contain a .metadata directory. It seems that in spark 1.2.x, it would just ignore the .metadata directory, but now that I'm using Spark 1.3, reading these files causes the following exceptions: scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir) SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [117, 101, 116, 10] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87) at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86) at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650) at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72) at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
Managing spark processes via supervisord
Hi All, I am curious to know if anyone has successfully deployed a spark cluster using supervisord? - http://supervisord.org/ Currently I am using the cluster launch scripts which are working greater, however, every time I reboot my VM or development environment I need to re-launch the cluster. I am considering using supervisord to control all the processes (worker, master, ect.. ) in order to have the cluster up an running after boot-up; although I'd like to understand if it will cause more issues than it solves. Thanks, Mike.
Re: Managing spark processes via supervisord
assuming you are talking about standalone cluster imho, with workers you won't get any problems and it's straightforward since they are usually foreground processes with master it's a bit more complicated, ./sbin/start-master.sh goes background which is not good for supervisor, but anyway I think it's doable(going to setup it too in a few days) On 3 June 2015 at 21:46, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am curious to know if anyone has successfully deployed a spark cluster using supervisord? - http://supervisord.org/ Currently I am using the cluster launch scripts which are working greater, however, every time I reboot my VM or development environment I need to re-launch the cluster. I am considering using supervisord to control all the processes (worker, master, ect.. ) in order to have the cluster up an running after boot-up; although I'd like to understand if it will cause more issues than it solves. Thanks, Mike.
Python Image Library and Spark
Hi all, I'm playing around with manipulating images via Python and want to utilize Spark for scalability. That said, I'm just learing Spark and my Python is a bit rusty (been doing PHP coding for the last few years). I think I have most of the process figured out. However, the script fails on larger images and Spark is sending out the following warning for smaller images: Stage 0 contains a task of very large size (1151 KB). The maximum recommended task size is 100 KB. My code is as follows: import Image from pyspark import SparkContext if __name__ == __main__: imageFile = sample.jpg outFile = sample.gray.jpg sc = SparkContext(appName=Grayscale) im = Image.open(imageFile) # Create an RDD for the data from the image file img_data = sc.parallelize( list(im.getdata()) ) # Create an RDD for the grayscale value gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07) ) # Put our grayscale value into the RGR channels grayscale = gValue.map( lambda x: (x,x,x) ) # Save the output in a new image. im.putdata( grayscale.collect() ) im.save(outFile) Obviously, something is amiss. However, I can't figure out where I'm off track with this. Any help is appreciated! Thanks in advance!!!
Re: Problem reading Parquet from 1.2 to 1.3
(bcc: user@spark, cc:cdh-user@cloudera) If you're using CDH, Spark SQL is currently unsupported and mostly untested. I'd recommend trying to use it in CDH. You could try an upstream version of Spark instead. On Wed, Jun 3, 2015 at 1:39 PM, Don Drake dondr...@gmail.com wrote: As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that Spark is behaving differently when reading Parquet directories that contain a .metadata directory. It seems that in spark 1.2.x, it would just ignore the .metadata directory, but now that I'm using Spark 1.3, reading these files causes the following exceptions: scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir) SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [116, 34, 10, 125] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . java.lang.RuntimeException: hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [117, 101, 116, 10] parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427) parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276) org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275) scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) . . . at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87) at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86) at scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650) at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72) at scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at
Re: Python Image Library and Spark
Try with large number of partition in parallelize. On 4 Jun 2015 06:28, Justin Spargur jmspar...@gmail.com wrote: Hi all, I'm playing around with manipulating images via Python and want to utilize Spark for scalability. That said, I'm just learing Spark and my Python is a bit rusty (been doing PHP coding for the last few years). I think I have most of the process figured out. However, the script fails on larger images and Spark is sending out the following warning for smaller images: Stage 0 contains a task of very large size (1151 KB). The maximum recommended task size is 100 KB. My code is as follows: import Image from pyspark import SparkContext if __name__ == __main__: imageFile = sample.jpg outFile = sample.gray.jpg sc = SparkContext(appName=Grayscale) im = Image.open(imageFile) # Create an RDD for the data from the image file img_data = sc.parallelize( list(im.getdata()) ) # Create an RDD for the grayscale value gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07) ) # Put our grayscale value into the RGR channels grayscale = gValue.map( lambda x: (x,x,x) ) # Save the output in a new image. im.putdata( grayscale.collect() ) im.save(outFile) Obviously, something is amiss. However, I can't figure out where I'm off track with this. Any help is appreciated! Thanks in advance!!!
[ANNOUNCE] YARN support in Spark EC2
Hi all We recently merged support for launching YARN clusters using Spark EC2 scripts as a part of https://issues.apache.org/jira/browse/SPARK-3674. To use this you can pass in hadoop-major-version as yarn to the spark-ec2 script and this will setup Hadoop 2.4 HDFS, YARN and Spark built for YARN on the EC2 cluster. Developers who work on features related to YARN might find this useful for testing / benchmarking Spark with YARN. If anyone has questions or feedback please let me know. Thanks Shivaram
Standard Scaler taking 1.5hrs
Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@d2mailto:org.apache.spark.HashPartitioner@d2) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242 M: 323.377.9197 www.marketshare.comhttp://www.marketsharepartners.com/ twitter.com/marketsharephttp://twitter.com/marketsharep
Re: data localisation in spark
Tasks are scheduled on executors based on data locality. Things work as you would expect in the example you brought up. Through dynamic allocation, the number of executors can change throughout the life time of an application. 10 executors (or 5 executors with 2 cores each) are not needed for a reducebyKey with parallelism = 10. If there are fewer slots to run tasks than tasks, the tasks will just be run serially. -Sandy On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora shushantaror...@gmail.com wrote: So in spark is after acquiring executors from ClusterManeger, does tasks are scheduled on executors based on datalocality ?I Mean if in an application there are 2 jobs and output of 1 job is used as input of another job. And in job1 I did persist on some RDD, then while running job2 will it use the same executor where job1's output was persisted or it acquire executor again and data movement happens? And is it true no of execuotrs in an application are fixed and acquired at start of application and remains same throught application? If yes, how does it takes cares of explicit no of reducers in some of apis say rddd.reduceByKey(func,10); does at converting DAG to stages it calculates executors required and then acquire executors/worker nodes ? On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException
Hi Doug, Actually, sqlContext.table does not support database name in both Spark 1.3 and Spark 1.4. We will support it in future version. Thanks, Yin On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com wrote: Hi, sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException. But I can access the table via sqlContext.sql(“select * from db.tbl”) So I know it has the table info from the metastore. Anyone else see this ? I’ll keep digging. I compiled via make-distribution -Pyarn -phadoop-2.4 -Phive -Phive-thriftserver It worked for me in 1.3.1 Cheers, Doug - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS Rating Object
Hi Yasemin, If you can convert your user IDs to Integers in pre-processing (if you have a couple billion users), that would work. Otherwise... In Spark 1.3: You may need to modify ALS to use Long instead of Int. In Spark 1.4: spark.ml.recommendation.ALS (in the Pipeline API) exposes ALS.train as a DeveloperApi to allow users to use Long instead of Int. We're also thinking about better ways to permit Long IDs. Joseph On Wed, Jun 3, 2015 at 5:04 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to use Spark's ALS in my project. I have the userid like 30011397223227125563254 and Rating Object which is the Object of ALS wants Integer as a userid so the id field does not fit into a 32 bit Integer. How can I solve that ? Thanks. Best, yasemin -- hiç ender hiç
RE: Make HTTP requests from within Spark
The short answer is yes. How you do it depends on a number of factors. Assuming you want to build an RDD from the responses and then analyze the responses using Spark core (not Spark Streaming), here is one simple way to do it: 1) Implement a class or function that connects to a web service and returns a list of responses. This code has no dependency on Spark. It will be the same whether you are using Spark or not. Obviously, you have to be take into account memory and latency requirements. 2) Call sc.parallelize on the list obtained in step 1. This is not the most efficient way of doing it, but hopefully gives you an idea. Mohammed -Original Message- From: kasparfischer [mailto:kaspar.fisc...@dreizak.com] Sent: Wednesday, June 3, 2015 12:49 AM To: user@spark.apache.org Subject: Make HTTP requests from within Spark Hi everybody, I'm new to Spark, apologies if my question is very basic. I have a need to send millions of requests to a web service and analyse and store the responses in an RDD. I can easy express the analysing part using Spark's filter/map/etc. primitives but I don't know how to make the requests. Is that something I can do from within Spark? Or Spark Streaming? Or does it conflict with the way Spark works? I've found a similar question but am not sure whether the answer applies here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html Any clarifications or pointers would be super helpful! Thanks, Kaspar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RandomForest - subsamplingRate parameter
When training a RandomForest model, the Strategy class (in mllib.tree.configuration) provides a subsamplingRate parameter. I was hoping to use this to cut down on processing time for large datasets (more than 2MM rows and 9K predictors), but I've found that the runtime stays approximately constant (and sometimes noticeably increases) when I try lowering the value of subsamplingRate. Is this the expected behavior? (And, if so, what is the intended purpose of this parameter?) Of course, I could always just subsample the input dataset prior to running RF, but I was hoping that the subsamplingRate (which ostensibly affects the sampling used during RF bagging) would decrease the amount of data processing without requiring me to entirely ignore large subsets of the data. Thanks, ~ Andrew This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
How to pass system properties in spark ?
Hi, I'm trying to use property substitution in my log4j.properties, so that I can choose where to write spark logs at runtime. The problem is that, system property passed to spark shell doesn't seem to getting propagated to log4j. *Here is log4j.properites(partial) with a parameter 'spark.log.path' :* log4j.appender.logFile=org.apache.log4j.FileAppender log4j.appender.logFile.File=*${spark.log.path}* log4j.appender.logFile.layout=org.apache.log4j.PatternLayout log4j.appender.logFile.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n *Here is how I pass the 'spark.log.path' variable on command line :* $spark-shell --conf spark.driver.extraJavaOptions=-Dspark.log.path=/tmp/spark.log I also tried : $spark-shell -Dspark.log.path=/tmp/spark.log *Result : */tmp*/*spark.log not getting created when I run spark. Any ideas why this is happening ? *When I enable log4j debug I see that following :* log4j: Setting property [file] to []. log4j: setFile called: , true log4j:ERROR setFile(null,true) call failed. java.io.FileNotFoundException: (No such file or directory) at java.io.FileOutputStream.open(Native Method) -- Thanks, Ashwin
Re: Spark Client
Did you try this? Create an sbt project like: // Create your context val sconf = new SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077) val sc = new SparkContext(sconf) // Do some computations sc.parallelize(1 to 1).take(10).foreach(println) //Now return the exit status System.exit(Some number) Now, make your workflow manager to trigger *sbt run* on the project instead of using spark-submit. Thanks Best Regards On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi akhil , sorry i may not conveying the question properly . Actually we are looking to Launch a spark job from a long running workflow manager, which invokes spark client via SparkSubmit. Unfortunately the client upon successful completion of the application exits with a System.exit(0) or System.exit(NON_ZERO) when there is a failure. Question is, Is there an alternate api though which a spark application can be launched which can return a exit status back to the caller as opposed to initiating JVM halt. On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri
run spark submit on cloudera cluster
hi, i want to run my spark app on a cluster, i use cloudera live single node vm. how i must build the job for the spark submit script? and i must upload spark submit on hdfs? best regards paul
Re: Application is always in process when I check out logs of completed application
Have you done sc.stop() ? :) On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote: I run spark application in spark standalone cluster with client deploy mode. I want to check out the logs of my finished application, but I always get a page telling me Application history not found - Application xxx is still in process. I am pretty sure that the application has indeed completed because I can see it in the Completed Applications list show by Spark WebUI, and I have also found the log file with suffix .inprocessin the directory set by spark.eventLog.dir in my spark-default.conf Oh, BTW, I am using spark 1.3.0 So, is there anything I missed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter operation to return two RDDs at once.
As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang
MetaException(message:java.security.AccessControlException: Permission denied
Hi, I was running a spark job to insert overwrite hive table and got Permission denied. My question is why spark job did the insert by using user 'hive', not myself who ran the job? How can I fix the problem? val hiveContext = new HiveContext(sc) import hiveContext.implicits._ hiveContext.sql(INSERT OVERWRITE table 4dim ... ) Caused by: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode=/apps/hive/warehouse/wrf_tables/4dim/zone=2/z=1/year=2009/month=1:patcharee:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6795) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6777) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6702) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAccess(FSNamesystem.java:9529) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.checkAccess(NameNodeRpcServer.java:1516) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.checkAccess(ClientNamenodeProtocolServerSideTranslatorPB.java:1433) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) ) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result.read(ThriftHiveMetastore.java) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_partition(ThriftHiveMetastore.java:2033) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_partition(ThriftHiveMetastore.java:2018) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_partition(HiveMetaStoreClient.java:1091) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy37.alter_partition(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:469) ... 26 more BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR cluster.YarnScheduler: Lost executor
You need to look into your executor/worker logs to see whats going on. Thanks Best Regards On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Filter operation to return two RDDs at once.
I check the RDD#randSplit, it is much more like multiple one-to-one transformation rather than a one-to-multiple transformation. I write one sample code as following, it would generate 3 stages. Although we can use cache here to make it better, If spark can support multiple outputs, only 2 stages are needed. ( This would be useful for pig's multiple query and hive's self join ) val data = sc.textFile(/Users/jzhang/a.log).flatMap(line=line.split(\\s)).map(w=(w,1)) val parts = data.randomSplit(Array(0.2,0.8)) val joinResult = parts(0).join(parts(1)) println(joinResult.toDebugString) (1) MapPartitionsRDD[8] at join at WordCount.scala:22 [] | MapPartitionsRDD[7] at join at WordCount.scala:22 [] | CoGroupedRDD[6] at join at WordCount.scala:22 [] +-(1) PartitionwiseSampledRDD[4] at randomSplit at WordCount.scala:21 [] | | MapPartitionsRDD[3] at map at WordCount.scala:20 [] | | MapPartitionsRDD[2] at flatMap at WordCount.scala:20 [] | | /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at WordCount.scala:20 [] | | /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 [] +-(1) PartitionwiseSampledRDD[5] at randomSplit at WordCount.scala:21 [] | MapPartitionsRDD[3] at map at WordCount.scala:20 [] | MapPartitionsRDD[2] at flatMap at WordCount.scala:20 [] | /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at WordCount.scala:20 [] | /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 [] On Wed, Jun 3, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote: In the sense here, Spark actually does have operations that make multiple RDDs like randomSplit. However there is not an equivalent of the partition operation which gives the elements that matched and did not match at once. On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote: As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2. qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: ERROR cluster.YarnScheduler: Lost executor
node down or container preempted ? You need to check the executor log / node manager log for more info. On Wed, Jun 3, 2015 at 2:31 PM, patcharee patcharee.thong...@uni.no wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: DataFrames coming in SparkR in Apache Spark 1.4.0
You can build Spark from the 1.4 release branch yourself: https://github.com/apache/spark/tree/branch-1.4 - Daniel Emaasit, Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrames-coming-in-SparkR-in-Apache-Spark-1-4-0-tp23116p23131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error: Building Spark 1.4.0 from Github-1.4 release branch
I run into errors while trying to build Spark from the 1.4 release branch yourself: https://github.com/apache/spark/tree/branch-1.4. Any help will be much appreciated. Here is the log file. (F.Y.I, I installed all the dependencies like Java 7, Maven 3.2.5) C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Parent POM [INFO] Spark Launcher Project [INFO] Spark Project Networking [INFO] Spark Project Shuffle Streaming Service [INFO] Spark Project Unsafe [INFO] Spark Project Core [INFO] Spark Project Bagel [INFO] Spark Project GraphX [INFO] Spark Project Streaming [INFO] Spark Project Catalyst [INFO] Spark Project SQL [INFO] Spark Project ML Library [INFO] Spark Project Tools [INFO] Spark Project Hive [INFO] Spark Project REPL [INFO] Spark Project YARN [INFO] Spark Project Assembly [INFO] Spark Project External Twitter [INFO] Spark Project External Flume Sink [INFO] Spark Project External Flume [INFO] Spark Project External MQTT [INFO] Spark Project External ZeroMQ [INFO] Spark Project External Kafka [INFO] Spark Project Examples [INFO] Spark Project External Kafka Assembly [INFO] Spark Project YARN Shuffle Service [INFO] [INFO] [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10 -- - [INFO] [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark-parent_2 .10 --- [INFO] [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @ spark-pare nt_2.10 --- [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\spark-b ranch-1.4\src\main\scala [INFO] Add Test Source directory: C:\Program Files\Apache Software Foundation\sp ark-branch-1.4\src\test\scala [INFO] [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @ spar k-parent_2.10 --- [INFO] Source directory: C:\Program Files\Apache Software Foundation\spark-branc h-1.4\src\main\scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark-parent_2. 10 --- [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... FAILURE [ 23.528 s] [INFO] Spark Launcher Project . SKIPPED [INFO] Spark Project Networking ... SKIPPED [INFO] Spark Project Shuffle Streaming Service SKIPPED [INFO] Spark Project Unsafe ... SKIPPED [INFO] Spark Project Core . SKIPPED [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project ML Library ... SKIPPED [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project YARN . SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] Spark Project YARN Shuffle Service . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 24.680 s [INFO] Finished at: 2015-06-03T02:11:35-07:00 [INFO] Final Memory: 27M/224M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-remote-resources-p lugin:1.5:process (default) on project spark-parent_2.10: Error finding remote r esources manifests: C:\Program Files\Apache Software Foundation\spark-branch-1.4 \target\maven-shared-archive-resources\META-INF\NOTICE (The system
Re: ERROR cluster.YarnScheduler: Lost executor
Hi again, Below is the log from executor FetchFailed(BlockManagerId(4, compute-10-0.local, 38594), shuffleId=0, mapId=117, reduceId=117, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to compute-10-0.local/10.10.255.241:38594 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.io.IOException: Failed to connect to compute-10-0.local/10.10.255.241:38594 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) ... 3 more Caused by: java.net.ConnectException: Connection refused: compute-10-0.local/10.10.255.241:38594 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at
Spark Client
Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri
Re: Filter operation to return two RDDs at once.
In the sense here, Spark actually does have operations that make multiple RDDs like randomSplit. However there is not an equivalent of the partition operation which gives the elements that matched and did not match at once. On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote: As far as I know, spark don't support multiple outputs On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote: Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2. qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak -- Best Regards Jeff Zhang
Re: Scripting with groovy
I think when you do a ssc.stop it will stop your entire application and by update a transformation function you mean modifying the driver program? In that case even if you checkpoint your application, it won't be able to recover from its previous state. A simpler approach would be to add certain conditions inside your transformation function and switch them accordingly instead of modifying the transformation. Thanks Best Regards On Wed, Jun 3, 2015 at 4:27 AM, Paolo Platter paolo.plat...@agilelab.it wrote: Hi all, Has anyone tried to add Scripting capabilities to spark streaming using groovy? I would like to stop the streaming context, update a transformation function written in groovy( for example to manipulate json ), restart the streaming context and obtain a new behavior without re-submit the application. Is it possible? Do you think it makes sense or there is a smarter way to accomplish that? Thanks Paolo Inviata dal mio Windows Phone
Re: Spark Client
Hi akhil , sorry i may not conveying the question properly . Actually we are looking to Launch a spark job from a long running workflow manager, which invokes spark client via SparkSubmit. Unfortunately the client upon successful completion of the application exits with a System.exit(0) or System.exit(NON_ZERO) when there is a failure. Question is, Is there an alternate api though which a spark application can be launched which can return a exit status back to the caller as opposed to initiating JVM halt. On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri
Re: ERROR cluster.YarnScheduler: Lost executor
This is log I can get 15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (2/3) for 4 outstanding blocks after 5000 ms 15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive connection to compute-10-3.local/10.10.255.238:33671, creating a new one. 15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in connection from /10.10.255.238:35430 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:744) 15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data, offset=896, length=1132499356}} to /10.10.255.238:35430; closing connection java.nio.channels.ClosedChannelException 15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 4 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to compute-10-3.local/10.10.255.238:33671 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.net.ConnectException: Connection refused: compute-10-3.local/10.10.255.238:33671 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more Best, Patcharee On 03. juni 2015 09:21, Akhil Das wrote: You need to look into your executor/worker logs to see whats going on. Thanks Best Regards On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no mailto:patcharee.thong...@uni.no wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Client
Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri
Make HTTP requests from within Spark
Hi everybody, I'm new to Spark, apologies if my question is very basic. I have a need to send millions of requests to a web service and analyse and store the responses in an RDD. I can easy express the analysing part using Spark's filter/map/etc. primitives but I don't know how to make the requests. Is that something I can do from within Spark? Or Spark Streaming? Or does it conflict with the way Spark works? I've found a similar question but am not sure whether the answer applies here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html Any clarifications or pointers would be super helpful! Thanks, Kaspar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4.0-rc3: Actor not found
Tried on some other data sources as well, and it actually works for some parquet sources. Potentially some specific problems with that first parquet source that I tried with, and not a Spark 1.4 problem. I'll get back with more info if I find any new information. Thanks, Anders On Tue, Jun 2, 2015 at 8:45 PM, Yin Huai yh...@databricks.com wrote: Does it happen every time you read a parquet source? On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote: The log is from the log aggregation tool (hortonworks, yarn logs ...), so both executors and driver. I'll send a private mail to you with the full logs. Also, tried another job as you suggested, and it actually worked fine. The first job was reading from a parquet source, and the second from an avro source. Could there be some issues with the parquet reader? Thanks, Anders On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote: How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Re: ERROR cluster.YarnScheduler: Lost executor
Which version of spark? Looks like you are hitting this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Wed, Jun 3, 2015 at 1:06 PM, patcharee patcharee.thong...@uni.no wrote: This is log I can get 15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (2/3) for 4 outstanding blocks after 5000 ms 15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive connection to compute-10-3.local/10.10.255.238:33671, creating a new one. 15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in connection from /10.10.255.238:35430 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:744) 15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data, offset=896, length=1132499356}} to /10.10.255.238:35430; closing connection java.nio.channels.ClosedChannelException 15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 4 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to compute-10-3.local/ 10.10.255.238:33671 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.net.ConnectException: Connection refused: compute-10-3.local/10.10.255.238:33671 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more Best, Patcharee On 03. juni 2015 09:21, Akhil Das wrote: You need to look into your executor/worker logs to see whats going on. Thanks Best Regards On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee
Re: Spark 1.4.0 build Error on Windows
I run into errors while trying to build Spark from the 1.4 release branch: https://github.com/apache/spark/tree/branch-1.4. Any help will be much appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I installed all the dependencies like Java 7, Maven 3.2.5 and set the environment variables) C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Parent POM [INFO] Spark Launcher Project [INFO] Spark Project Networking [INFO] Spark Project Shuffle Streaming Service [INFO] Spark Project Unsafe [INFO] Spark Project Core [INFO] Spark Project Bagel [INFO] Spark Project GraphX [INFO] Spark Project Streaming [INFO] Spark Project Catalyst [INFO] Spark Project SQL [INFO] Spark Project ML Library [INFO] Spark Project Tools [INFO] Spark Project Hive [INFO] Spark Project REPL [INFO] Spark Project YARN [INFO] Spark Project Assembly [INFO] Spark Project External Twitter [INFO] Spark Project External Flume Sink [INFO] Spark Project External Flume [INFO] Spark Project External MQTT [INFO] Spark Project External ZeroMQ [INFO] Spark Project External Kafka [INFO] Spark Project Examples [INFO] Spark Project External Kafka Assembly [INFO] Spark Project YARN Shuffle Service [INFO] [INFO] [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10 -- - [INFO] [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark -parent_2 .10 --- [INFO] [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @ spark -pare nt_2.10 --- [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\ spark-b ranch-1.4\src\main\scala [INFO] Add Test Source directory: C:\Program Files\Apache Software Foundation\sp ark-branch-1.4\src\test\scala [INFO] [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @ spar k-parent_2.10 --- [INFO] Source directory: C:\Program Files\Apache Software Foundation\spark -branc h-1.4\src\main\scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark -parent_2. 10 --- [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... FAILURE [ 23.528 s] [INFO] Spark Launcher Project . SKIPPED [INFO] Spark Project Networking ... SKIPPED [INFO] Spark Project Shuffle Streaming Service SKIPPED [INFO] Spark Project Unsafe ... SKIPPED [INFO] Spark Project Core . SKIPPED [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project ML Library ... SKIPPED [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project YARN . SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] Spark Project YARN Shuffle Service . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 24.680 s [INFO] Finished at: 2015-06-03T02:11:35-07:00 [INFO] Final Memory: 27M/224M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-remote-resources-p lugin:1.5:process (default) on project spark-parent_2.10: Error finding remote r esources manifests: C:\Program Files\Apache Software Foundation\spark -branch-1.4
Re: Filter operation to return two RDDs at once.
Why do you need to do that if filter and content of the resulting rdd are exactly same? You may as well declare them as 1 RDD. On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2.qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession. *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak
ERROR cluster.YarnScheduler: Lost executor
Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR cluster.YarnScheduler: Lost executor
I think you could check the yarn nodemanager log or other Spark executor logs to see the details. What you listed above of the exception stacks are just the phenomenon, not the cause. Normally there will be some situations which will lead to executor lost: 1. Killed by yarn cause of memory exceed, or preemption. 2. Killed by Spark itself when dynamic allocation is enabled. 3. Executor run into unexpected behavior and lost connection with driver. You need to check the executor logs as well as yarn logs to find any clues. Thanks Saisai 2015-06-03 17:17 GMT+08:00 patcharee patcharee.thong...@uni.no: Hi again, Below is the log from executor FetchFailed(BlockManagerId(4, compute-10-0.local, 38594), shuffleId=0, mapId=117, reduceId=117, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to compute-10-0.local/10.10.255.241:38594 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.io.IOException: Failed to connect to compute-10-0.local/ 10.10.255.241:38594 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at
RE: Objects serialized before foreachRDD/foreachPartition ?
Dmitry was concerned about the “serialization cost” NOT the “memory footprint – hence option a) is still viable since a Broadcast is performed only ONCE for the lifetime of Driver instance From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, June 3, 2015 2:44 PM To: Evo Eftimov Cc: dgoldenberg; user Subject: Re: Objects serialized before foreachRDD/foreachPartition ? Considering memory footprint of param as mentioned by Dmitry, option b seems better. Cheers On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
columnar structure of RDDs from Parquet or ORC files
When spark reads parquet files (sqlContext.parquetFile), it creates a DataFrame RDD. I would like to know if the resulting DataFrame has columnar structure (many rows of a column coalesced together in memory) or its a row wise structure that a spark RDD has. The section Spark SQL and DataFrames http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory says you need to call sqlContext.cacheTable(tableName) or df.cache() to make it columnar. What exactly is this columnar structure? To be precise: What does the row represent in the expression df.cache().map{row = ...}? Is it a logical row which maintains an array of columns and each column in turn is an array of values for batchSize rows? -Kiran
Re: Application is always in process when I check out logs of completed application
I had the same issue a couple days ago. It's a bug in 1.3.0 that is fixed in 1.3.1 and up. https://issues.apache.org/jira/browse/SPARK-6036 The ordering that the event logs are moved from in-progress to complete is coded to be after the Master tries to build the history page for the logs. The only reason it even works on occasion in 1.3.0 is because the Master part is run asynchronously and the event log status change is synchronous, so the Master part on some occasions could be executed afterwards as a race condition. On Wed, Jun 3, 2015 at 2:17 AM, ayan guha guha.a...@gmail.com wrote: Have you done sc.stop() ? :) On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote: I run spark application in spark standalone cluster with client deploy mode. I want to check out the logs of my finished application, but I always get a page telling me Application history not found - Application xxx is still in process. I am pretty sure that the application has indeed completed because I can see it in the Completed Applications list show by Spark WebUI, and I have also found the log file with suffix .inprocessin the directory set by spark.eventLog.dir in my spark-default.conf Oh, BTW, I am using spark 1.3.0 So, is there anything I missed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Client
I think the short answer to the question is, no, there is no alternate API that will not use the System.exit calls. You can craft a workaround like is being suggested in this thread. For comparison, we are doing programmatic submission of applications in a long-running client application. To get around these issues we make a shadowed version of some of the Spark code in our application to remove the System.exit calls so instead exceptions bubble up to our application. On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try this? Create an sbt project like: // Create your context val sconf = new SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077) val sc = new SparkContext(sconf) // Do some computations sc.parallelize(1 to 1).take(10).foreach(println) //Now return the exit status System.exit(Some number) Now, make your workflow manager to trigger *sbt run* on the project instead of using spark-submit. Thanks Best Regards On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi akhil , sorry i may not conveying the question properly . Actually we are looking to Launch a spark job from a long running workflow manager, which invokes spark client via SparkSubmit. Unfortunately the client upon successful completion of the application exits with a System.exit(0) or System.exit(NON_ZERO) when there is a failure. Question is, Is there an alternate api though which a spark application can be launched which can return a exit status back to the caller as opposed to initiating JVM halt. On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri
Does Apache Spark maintain a columnar structure when creating RDDs from Parquet or ORC files?
When spark reads parquet files (sqlContext.parquetFile), it creates a DataFrame RDD. I would like to know if the resulting DataFrame has columnar structure (many rows of a column coalesced together in memory) or its a row wise structure that a spark RDD has. The section Spark SQL and DataFrames http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory says you need to call sqlContext.cacheTable(tableName) or df.cache() to make it columnar. What exactly is this columnar structure? To be precise: What does the row represent in the expression df.cache().map{row = ...}? Is it a logical row which maintains an array of columns and each column in turn is an array of values for batchSize rows? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Apache-Spark-maintain-a-columnar-structure-when-creating-RDDs-from-Parquet-or-ORC-files-tp23139.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Equivalent to Storm's 'field grouping' in Spark.
Hi everybody, is there in Spark anything sharing the philosophy of Storm's field grouping? I'd like to manage data partitioning across the workers by sending tuples sharing the same key to the very same worker in the cluster, but I did not find any method to do that. Suggestions? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Objects serialized before foreachRDD/foreachPartition ?
Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ALS Rating Object
Hi, I want to use Spark's ALS in my project. I have the userid like 30011397223227125563254 and Rating Object which is the Object of ALS wants Integer as a userid so the id field does not fit into a 32 bit Integer. How can I solve that ? Thanks. Best, yasemin -- hiç ender hiç
Re: in GraphX,program with Pregel runs slower and slower after several iterations
I think you're exactly right. I once had 100 iterations in a single Pregel call, and got into the lineage problem right there. I had to modify the Pregel function and checkpoint both the graph and the newVerts RDD there to cut off the lineage. If you draw out the dependency graph among the g, the newVerts RDD and the messages RDD inside the Pregel loop, then you will find out we need to checkpoint two things to really cut off the lineage: the graph itself and one of newVerts or messages. This is how I did it inside the Pregel loop: ... prevG = g g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = newOpt.getOrElse(old) } g.cache() if (i % 50 == 0) { g.checkpoint newVerts.checkpoint } ... Also note: checkpointing is only effective before the RDD is materialized. If you checkpoint outside of Pregel, which means the graph is already materialized (by the mapReduceTriplets call), then nothing will happen. You can examine that by looking at the RDD.toDebugString. Therefore, I had to apply the following workaround: val clonedGraph = graph.mapVertices((vid, vd) = vd).mapEdges{edge = edge.attr} clonedGraph.checkpoint graph = clonedGraph -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23133.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Objects serialized before foreachRDD/foreachPartition ?
I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Objects serialized before foreachRDD/foreachPartition ?
Considering memory footprint of param as mentioned by Dmitry, option b seems better. Cheers On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Great. You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” -- anything specific you were thinking of? On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote: Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:46 PM *To:* Evo Eftimov *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:14 PM *To:* Cody Koeninger *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a
Re: Spark 1.4.0 build Error on Windows
I got the same error message when using maven 3.3 . On Jun 3, 2015 8:58 AM, Ted Yu yuzhih...@gmail.com wrote: I used the same command on Linux but didn't reproduce the error. Can you include -X switch on your command line ? Also consider upgrading maven to 3.3.x Cheers On Wed, Jun 3, 2015 at 2:36 AM, Daniel Emaasit daniel.emaa...@gmail.com wrote: I run into errors while trying to build Spark from the 1.4 release branch: https://github.com/apache/spark/tree/branch-1.4. Any help will be much appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I installed all the dependencies like Java 7, Maven 3.2.5 and set the environment variables) C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Parent POM [INFO] Spark Launcher Project [INFO] Spark Project Networking [INFO] Spark Project Shuffle Streaming Service [INFO] Spark Project Unsafe [INFO] Spark Project Core [INFO] Spark Project Bagel [INFO] Spark Project GraphX [INFO] Spark Project Streaming [INFO] Spark Project Catalyst [INFO] Spark Project SQL [INFO] Spark Project ML Library [INFO] Spark Project Tools [INFO] Spark Project Hive [INFO] Spark Project REPL [INFO] Spark Project YARN [INFO] Spark Project Assembly [INFO] Spark Project External Twitter [INFO] Spark Project External Flume Sink [INFO] Spark Project External Flume [INFO] Spark Project External MQTT [INFO] Spark Project External ZeroMQ [INFO] Spark Project External Kafka [INFO] Spark Project Examples [INFO] Spark Project External Kafka Assembly [INFO] Spark Project YARN Shuffle Service [INFO] [INFO] [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10 -- - [INFO] [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark -parent_2 .10 --- [INFO] [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @ spark-pare nt_2.10 --- [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\ spark-b ranch-1.4\src\main\scala [INFO] Add Test Source directory: C:\Program Files\Apache Software Foundation\sp ark-branch-1.4\src\test\scala [INFO] [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @ spar k-parent_2.10 --- [INFO] Source directory: C:\Program Files\Apache Software Foundation\ spark-branc h-1.4\src\main\scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark -parent_2. 10 --- [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... FAILURE [ 23.528 s] [INFO] Spark Launcher Project . SKIPPED [INFO] Spark Project Networking ... SKIPPED [INFO] Spark Project Shuffle Streaming Service SKIPPED [INFO] Spark Project Unsafe ... SKIPPED [INFO] Spark Project Core . SKIPPED [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project ML Library ... SKIPPED [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project YARN . SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] Spark Project YARN Shuffle Service . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO]
Re: Objects serialized before foreachRDD/foreachPartition ?
So Evo, option b is to singleton the Param, as in your modified snippet, i.e. instantiate is once per an RDD. But if I understand correctly the a) option is broadcast, meaning instantiation is in the Driver once before any transformations and actions, correct? That's where my serialization costs concerns were. There's the Kryo serialization but Param might still be too heavy. If some of its member variables are lazy loaded we may be OK. But it seems then on every worker node the lazy initialization would have to happen to load these lazy loaded resources into Param - ? public class Param { // == potentially a very hefty resource to load private MapString, String dictionary = new HashMapString, String(); ... } I'm groking that Spark will serialize Param right before the call to foreachRDD, if we're to broadcast... On Wed, Jun 3, 2015 at 9:58 AM, Evo Eftimov evo.efti...@isecc.com wrote: Dmitry was concerned about the “serialization cost” NOT the “memory footprint – hence option a) is still viable since a Broadcast is performed only ONCE for the lifetime of Driver instance *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Wednesday, June 3, 2015 2:44 PM *To:* Evo Eftimov *Cc:* dgoldenberg; user *Subject:* Re: Objects serialized before foreachRDD/foreachPartition ? Considering memory footprint of param as mentioned by Dmitry, option b seems better. Cheers On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: Hmmm a spark streaming app code doesn't execute in the linear fashion assumed in your previous code snippet - to achieve your objectives you should do something like the following in terms of your second objective - saving the initialization and serialization of the params you can: a) broadcast them b) have them as a Singleton (initialized from e.g. params in a file on HDFS) on each Executor messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { Param param = new Param(); param.initialize(); @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); //put this in e.g. the object destructor param.deinitialize(); -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 1:56 PM To: user@spark.apache.org Subject: Objects serialized before foreachRDD/foreachPartition ? I'm looking at https://spark.apache.org/docs/latest/tuning.html. Basically the takeaway is that all objects passed into the code processing RDD's must be serializable. So if I've got a few objects that I'd rather initialize once and deinitialize once outside of the logic processing the RDD's, I'd need to think twice about the costs of serializing such objects, it would seem. In the below, does the Spark serialization happen before calling foreachRDD or before calling foreachPartition? Param param = new Param(); param.initialize(); messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() { @Override public Void call(JavaRDDString rdd) throws Exception { ProcessPartitionFunction func = new ProcessPartitionFunction(param); rdd.foreachPartition(func); return null; } }); param.deinitialize(); If param gets initialized to a significant memory footprint, are we better off creating/initializing it before calling new ProcessPartitionFunction() or perhaps in the 'call' method within that function? I'm trying to avoid calling expensive init()/deinit() methods while balancing against the serialization costs. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor e-foreachRDD-foreachPartition-tp23134.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
If we have a hand-off between the older consumer and the newer consumer, I wonder if we need to manually manage the offsets in Kafka so as not to miss some messages as the hand-off is happening. Or if we let the new consumer run for a bit then let the old consumer know the 'new guy is in town' then the old consumer can be shut off. Some overlap is OK... On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote: Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:46 PM *To:* Evo Eftimov *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:14 PM *To:* Cody Koeninger *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com: Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Thursday, May 28, 2015 2:34 PM *To:* Evo Eftimov *Cc:* Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:46 PM To: Evo Eftimov Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:14 PM To: Cody Koeninger Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com: Probably you should ALWAYS keep the RDD storage policy to MEMORY
Re: Spark 1.4.0 build Error on Windows
I used the same command on Linux but didn't reproduce the error. Can you include -X switch on your command line ? Also consider upgrading maven to 3.3.x Cheers On Wed, Jun 3, 2015 at 2:36 AM, Daniel Emaasit daniel.emaa...@gmail.com wrote: I run into errors while trying to build Spark from the 1.4 release branch: https://github.com/apache/spark/tree/branch-1.4. Any help will be much appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I installed all the dependencies like Java 7, Maven 3.2.5 and set the environment variables) C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Parent POM [INFO] Spark Launcher Project [INFO] Spark Project Networking [INFO] Spark Project Shuffle Streaming Service [INFO] Spark Project Unsafe [INFO] Spark Project Core [INFO] Spark Project Bagel [INFO] Spark Project GraphX [INFO] Spark Project Streaming [INFO] Spark Project Catalyst [INFO] Spark Project SQL [INFO] Spark Project ML Library [INFO] Spark Project Tools [INFO] Spark Project Hive [INFO] Spark Project REPL [INFO] Spark Project YARN [INFO] Spark Project Assembly [INFO] Spark Project External Twitter [INFO] Spark Project External Flume Sink [INFO] Spark Project External Flume [INFO] Spark Project External MQTT [INFO] Spark Project External ZeroMQ [INFO] Spark Project External Kafka [INFO] Spark Project Examples [INFO] Spark Project External Kafka Assembly [INFO] Spark Project YARN Shuffle Service [INFO] [INFO] [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10 -- - [INFO] [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark -parent_2 .10 --- [INFO] [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @ spark-pare nt_2.10 --- [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\ spark-b ranch-1.4\src\main\scala [INFO] Add Test Source directory: C:\Program Files\Apache Software Foundation\sp ark-branch-1.4\src\test\scala [INFO] [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @ spar k-parent_2.10 --- [INFO] Source directory: C:\Program Files\Apache Software Foundation\spark -branc h-1.4\src\main\scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark -parent_2. 10 --- [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... FAILURE [ 23.528 s] [INFO] Spark Launcher Project . SKIPPED [INFO] Spark Project Networking ... SKIPPED [INFO] Spark Project Shuffle Streaming Service SKIPPED [INFO] Spark Project Unsafe ... SKIPPED [INFO] Spark Project Core . SKIPPED [INFO] Spark Project Bagel SKIPPED [INFO] Spark Project GraphX ... SKIPPED [INFO] Spark Project Streaming SKIPPED [INFO] Spark Project Catalyst . SKIPPED [INFO] Spark Project SQL .. SKIPPED [INFO] Spark Project ML Library ... SKIPPED [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project YARN . SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] Spark Project YARN Shuffle Service . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 24.680 s [INFO] Finished at: 2015-06-03T02:11:35-07:00 [INFO] Final
Re: Example Page Java Function2
Yes, I think you're right. Since this is a change to the ASF hosted site, I can make this change to the .md / .html directly rather than go through the usual PR. On Wed, Jun 3, 2015 at 6:23 PM, linkstar350 . tweicomepan...@gmail.com wrote: Hi, I'm Taira. I notice that this example page may be a mistake. https://spark.apache.org/examples.html Word Count (Java) JavaRDDString textFile = spark.textFile(hdfs://...); JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, String() { public IterableString call(String s) { return Arrays.asList(s.split( )); } }); JavaPairRDDString, Integer pairs = words.mapToPair(new PairFunctionString, String, Integer() { public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); JavaPairRDDString, Integer counts = pairs.reduceByKey(new Function2Integer, Integer() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile(hdfs://...); Function2 should have three generic type arguments, but there are only two. I hope for your consideration. Taira - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Example Page Java Function2
Hi, I'm Taira. I notice that this example page may be a mistake. https://spark.apache.org/examples.html Word Count (Java) JavaRDDString textFile = spark.textFile(hdfs://...); JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, String() { public IterableString call(String s) { return Arrays.asList(s.split( )); } }); JavaPairRDDString, Integer pairs = words.mapToPair(new PairFunctionString, String, Integer() { public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); JavaPairRDDString, Integer counts = pairs.reduceByKey(new Function2Integer, Integer() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile(hdfs://...); Function2 should have three generic type arguments, but there are only two. I hope for your consideration. Taira - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StreamingListener, anyone?
Hi, I've got a Spark Streaming driver job implemented and in it, I register a streaming listener, like so: JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); jssc.addStreamingListener(new JobListener(jssc)); where JobListener is defined like so private static class JobListener implements StreamingListener { private JavaStreamingContext jssc; JobListener(JavaStreamingContext jssc) { this.jssc = jssc; } @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { System.out.println( Batch completed.); jssc.stop(true); System.out.println( The job has been stopped.); } I do not seem to be seeing onBatchCompleted being triggered. Am I doing something wrong? In this particular case, I was trying to implement a bulk ingest type of logic where the first batch is all we're interested in (reading out of a Kafka topic with offset reset set to smallest). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
I am pasting some of the exchanges I had on this topic via the mailing list directly so it may help someone else too. (Don't know why those responses don't show up here). --- Thanks Imran. It does help clarify. I believe I had it right all along then but was confused by documentation talking about never changing the broadcasted variables. I've tried it on a local mode process till now and does seem to work as intended. When (and if !) we start running on a real cluster, I hope this holds up. Thanks NB On Tue, May 19, 2015 at 6:25 AM, Imran Rashid iras...@cloudera.com wrote: hmm, I guess it depends on the way you look at it. In a way, I'm saying that spark does *not* have any built in auto-re-broadcast if you try to mutate a broadcast variable. Instead, you should create something new, and just broadcast it separately. Then just have all the code you have operating on your RDDs look at the new broadcast variable. But I guess there is another way to look at it -- you are creating new broadcast variables each time, but they all point to the same underlying mutable data structure. So in a way, you are rebroadcasting the same underlying data structure. Let me expand my example from earlier a little bit more: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } // this is a val, because the data structure itself is mutable val myMutableDataStructue = ... // this is a var, because you will create new broadcasts var myBroadcast = sc.broadcast(myMutableDataStructure) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) // update your mutable data structure in place myMutableDataStructure.update(...) // ... but that doesn't effect the broadcast variables living out on the cluster, so we need to // create a new one // this line is not required -- the broadcast var will automatically get unpersisted when a gc // cleans up the old broadcast on the driver, but I'm including this here for completeness, // in case you want to more proactively clean up old blocks if you are low on space myBroadcast.unpersist() // now we create a new broadcast which has the updated data in our mutable data structure myBroadcast = sc.broadcast(myMutableDataStructure) } hope this clarifies things! Imran On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote: Hi Imran, If I understood you correctly, you are suggesting to simply call broadcast again from the driver program. This is exactly what I am hoping will work as I have the Broadcast data wrapped up and I am indeed (re)broadcasting the wrapper over again when the underlying data changes. However, documentation seems to suggest that one cannot re-broadcast. Is my understanding accurate? Thanks NB On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote: Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer.
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
I think what we'd want to do is track the ingestion rate in the consumer(s) via Spark's aggregation functions and such. If we're at a critical level (load too high / load too low) then we issue a request into our Provisioning Component to add/remove machines. Once it comes back with an OK, each consumer can finish its current batch, then terminate itself, and restart with a new context. The new context would be aware of the updated cluster - correct? Therefore the refreshed consumer would restart on the updated cluster. Could we even terminate the consumer immediately upon sensing a critical event? When it would restart, could it resume right where it left off? On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote: Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:46 PM *To:* Evo Eftimov *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] *Sent:* Wednesday, June 3, 2015 4:14 PM *To:* Cody Koeninger *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a shadow/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch
Re: Behavior of the spark.streaming.kafka.maxRatePerPartition config param?
The default of 0 means no limit. Each batch will grab as much as is available, ie a range of offsets spanning from the end of the previous batch to the highest available offsets on the leader. If you set spark.streaming.kafka.maxRatePerPartition 0, the number you set is the maximum number of messages per partition per second. If you have a reproducible case that behaves differently, please share it. On Tue, Jun 2, 2015 at 5:28 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Could someone explain the behavior of the spark.streaming.kafka.maxRatePerPartition parameter? The doc says An important (configuration) is spark.streaming.kafka.maxRatePerPartition which is the maximum rate at which each Kafka partition will be read by (the) direct API. What is the default behavior for this parameter? From some testing it appears that with it not being set, the RDD size tends to be quite low. With it set, we're seeing the consumer picking up items off the topic quite more actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in --driver-java-options. Does this parameter set the RDD size to a very low value? seems to be defaulting to 0... but what's the effect of that? protected val maxMessagesPerPartition: Option[Long] = { val ratePerSec = context.sparkContext.getConf.getInt( spark.streaming.kafka.maxRatePerPartition, 0) if (ratePerSec 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some((secsPerBatch * ratePerSec).toLong) } else { None } } // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp = leaderOffsets.map { case (tp, lo) = tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } what would we limit by default? And once Spark Streaming does pick up messages, would it be at the maximum value? does it ever fall below max even if there are max or more than max in the topic? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Adding new Spark workers on AWS EC2 - access error
I have the existing operating Spark cluster that was launched with spark-ec2 script. I'm trying to add new slave by following the instructions: Stop the cluster On AWS console launch more like this on one of the slaves Start the cluster Although the new instance is added to the same security group and I can successfully SSH to it with the same private key, spark-ec2 ... start call can't access this machine for some reason: Running setup-slave on all cluster nodes to mount filesystems, etc... [1] 00:59:59 [FAILURE] ec2-52-25-53-64.us-west-2.compute.amazonaws.com Exited with error code 255 Stderr: Permission denied (publickey). , obviously, followed by tons of other errors while trying to deploy Spark stuff on this instance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Make HTTP requests from within Spark
Try something like the following. Create a function to make the HTTP call, e.g. using org.apache.commons.httpclient.HttpClient as in below. def getUrlAsString(url: String): String = { val client = new org.apache.http.impl.client.DefaultHttpClient() val request = new org.apache.http.client.methods.HttpGet(url) val response = client.execute(request) val handler = new org.apache.http.impl.client.BasicResponseHandler() handler.handleResponse(response).trim } Then build up your set of urls, and pass the, as a parameter to your HTTP function. This is not the most basic example, but it includes some logic to handle paging for a REST API and also control the number of concurrent threads if there are fewer than number of CPUs. val (max, batchSize, threads) = (1500, 200, 20) val calls = sc.parallelize( (0 to max by batchSize).map( page = shttps://some.url/jsonapi?_start=${page}_limit=${batchSize};) ,threads) if (debug) { def partMapper(index: Int, iter: Iterator[String]) : Iterator[ Map[ String, Any ] ] = { iter.toList.map(callString = Map(thread - index, call - callString)).iterator } calls.mapPartitionsWithIndex(partMapper).collect.foreach(println) } val callRDD = calls.map(getUrlAsString(_))) val yourDataFrame = jsonRDD(callRDD) On Wed, Jun 3, 2015 at 7:25 PM, William Briggs wrbri...@gmail.com wrote: Hi Kaspar, This is definitely doable, but in my opinion, it's important to remember that, at its core, Spark is based around a functional programming paradigm - you're taking input sets of data and, by applying various transformations, you end up with a dataset that represents your answer. Without knowing more about your use case, and keeping in mind that I'm very new to Spark, here are a few things I would want to think about if I were writing this as a non-Streaming Spark application: 1. What is your starting dataset? Do you have an initial set of parameters or a data source that is used to define each of the millions of requests? If so, then that should comprise your first RDD and you can perform subsequent transformations to prepare your HTTP requests (e.g., start with the information that drives the generation of the requests, and use map/flatMap to create an RDD that has the full list of requests you want to run). 2. Are the HTTP requests read-only, and/or idempotent (are you only looking up data, or are you performing requests that cause some sort of side effect)? Spark operations against RDDs work by defining a lineage graph, and transformations will be re-run if a partition in the lineage needs to be recalculated for any reason. If your HTTP requests are causing side-effects that should not be repeated, then Spark may not be the best fit for that portion of the job, and you might want to use something else, pipe the results into HDFS, and then analyze those using Spark.. 3. If your web service requests are lookups or are idempotent, then we're on the right track. Keep in mind that your web service probably will not scale as well as the Spark job - a naive first-pass implementation could easily overwhelm many services, particularly if/when partitions need to be recalculated. There are a few mechanisms you can use to mitigate this - one is to use mapPartitions rather than map when transforming the set of requests to the set of results, initialize an HTTP connection for each partition, and transform the data that defines the request into your desired dataset by invoking the web service. Using mapPartitions allows you to limit the number of concurrent HTTP connections to one per partition (although this may be too slow if your service is slow... there is obviously a bit of analysis, testing and profiling that would need to be done on the entire job). Another consideration would be to look at persisting or caching the intermediate results after you've successfully retrieved your results from the service, to reduce the likelihood of hitting the web service more than necessary. 4. Just realized you might be looking for help invoking an HTTP service programmatically from Scala / Spark - if so, you might want to look at the spray-client http://spray.io/documentation/1.2.3/spray-client/ library. 5. With millions of web service requests, it's highly likely some will fail, for a variety of reasons. Look into using Scala's Try http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or Either http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads to encode success / failure, and treat failed requests as first-class citizens in your RDD of results (by retrying them, filtering them, logging them, etc., based on your specific needs and use case). Make sure you are setting reasonable timeouts on your service calls to prevent the jSpark ob from getting stuck if the service
Spark 1.4 HiveContext fails to initialise with native libs
Hi all, Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client mode with Hive support. *Build command;* ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests -e -X When trying to run a hive query in the spark shell *sqlContext.sql(show tables)* I get the following exception; scala sqlContext.sql(show tables) 15/06/04 04:33:16 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 0.13.1 using Spark classes. java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323) at com.mapr.fs.ShimLoader.load(ShimLoader.java:198) at org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857) at org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072) at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282) at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234) at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002) at org.apache.hadoop.conf.Configuration.set(Configuration.java:974) at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518) at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536) at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430) at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366) at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332) at org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170) at org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175) at org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370) at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369) at org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382) at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32) at $line37.$read$$iwC$$iwC$$iwC.init(console:34) at $line37.$read$$iwC$$iwC.init(console:36) at $line37.$read$$iwC.init(console:38) at $line37.$read.init(console:40) at $line37.$read$.init(console:44) at $line37.$read$.clinit(console) at $line37.$eval$.init(console:7) at $line37.$eval$.clinit(console) at $line37.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at
Re: ALS Rating Object
Hi Joseph, I think about converting IDS but there will be birthday problem. The probability of a Hash Collision http://preshing.com/20110504/hash-collision-probabilities/ is important for me because of the user number. I don't know how can I modify ALS to use Integer. yasemin 2015-06-04 2:28 GMT+03:00 Joseph Bradley jos...@databricks.com: Hi Yasemin, If you can convert your user IDs to Integers in pre-processing (if you have a couple billion users), that would work. Otherwise... In Spark 1.3: You may need to modify ALS to use Long instead of Int. In Spark 1.4: spark.ml.recommendation.ALS (in the Pipeline API) exposes ALS.train as a DeveloperApi to allow users to use Long instead of Int. We're also thinking about better ways to permit Long IDs. Joseph On Wed, Jun 3, 2015 at 5:04 AM, Yasemin Kaya godo...@gmail.com wrote: Hi, I want to use Spark's ALS in my project. I have the userid like 30011397223227125563254 and Rating Object which is the Object of ALS wants Integer as a userid so the id field does not fit into a 32 bit Integer. How can I solve that ? Thanks. Best, yasemin -- hiç ender hiç -- hiç ender hiç
Re: Example Page Java Function2
Thank you. I confirmed the page. 2015-06-04 1:35 GMT+09:00 Sean Owen so...@cloudera.com: Yes, I think you're right. Since this is a change to the ASF hosted site, I can make this change to the .md / .html directly rather than go through the usual PR. On Wed, Jun 3, 2015 at 6:23 PM, linkstar350 . tweicomepan...@gmail.com wrote: Hi, I'm Taira. I notice that this example page may be a mistake. https://spark.apache.org/examples.html Word Count (Java) JavaRDDString textFile = spark.textFile(hdfs://...); JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, String() { public IterableString call(String s) { return Arrays.asList(s.split( )); } }); JavaPairRDDString, Integer pairs = words.mapToPair(new PairFunctionString, String, Integer() { public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); JavaPairRDDString, Integer counts = pairs.reduceByKey(new Function2Integer, Integer() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile(hdfs://...); Function2 should have three generic type arguments, but there are only two. I hope for your consideration. Taira - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standard Scaler taking 1.5hrs
The fit part is very slow, transform not at all. The number of partitions was 210 vs number of executors 80. Spark 1.4 sounds great but as my company is using Qubole we are dependent upon them to upgrade from version 1.3.1. Until that happens, can you think of any other reasons as to why it could be slow. Sparse vectors? Excessive number of columns? Sent from my mobile device. Please excuse any typos. On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.commailto:dbt...@dbtsai.com wrote: Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.commailto:pcinquegr...@marketshare.com wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@d2) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242tel:310.914.5677%20x242 M: 323.377.9197tel:323.377.9197 www.marketshare.comhttp://www.marketsharepartners.com/ twitter.com/marketsharephttp://twitter.com/marketsharep
TreeReduce Functionality in Spark
I am trying to understand what the treeReduce function for an RDD does, and how it is different from the normal reduce function. My current understanding is that treeReduce tries to split up the reduce into multiple steps. We do a partial reduce on different nodes, and then a final reduce is done to get the final result. Is this correct? If so, I guess what I am curious about is, how does spark decide how many nodes will be on each level, and how many partitions will be sent to a given node? The bulk of the implementation is within this function: partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException(empty collection)) The above function is expanded to val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. while (numPartitions scale + numPartitions / scale) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) = iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) I am completely lost about what is happening in this function. I would greatly appreciate some sort of explanation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standard Scaler taking 1.5hrs
Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some( org.apache.spark.HashPartitioner@d2) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242 M: 323.377.9197 www.marketshare.com http://www.marketsharepartners.com/ twitter.com/marketsharep
Re: Make HTTP requests from within Spark
Hi Kaspar, This is definitely doable, but in my opinion, it's important to remember that, at its core, Spark is based around a functional programming paradigm - you're taking input sets of data and, by applying various transformations, you end up with a dataset that represents your answer. Without knowing more about your use case, and keeping in mind that I'm very new to Spark, here are a few things I would want to think about if I were writing this as a non-Streaming Spark application: 1. What is your starting dataset? Do you have an initial set of parameters or a data source that is used to define each of the millions of requests? If so, then that should comprise your first RDD and you can perform subsequent transformations to prepare your HTTP requests (e.g., start with the information that drives the generation of the requests, and use map/flatMap to create an RDD that has the full list of requests you want to run). 2. Are the HTTP requests read-only, and/or idempotent (are you only looking up data, or are you performing requests that cause some sort of side effect)? Spark operations against RDDs work by defining a lineage graph, and transformations will be re-run if a partition in the lineage needs to be recalculated for any reason. If your HTTP requests are causing side-effects that should not be repeated, then Spark may not be the best fit for that portion of the job, and you might want to use something else, pipe the results into HDFS, and then analyze those using Spark.. 3. If your web service requests are lookups or are idempotent, then we're on the right track. Keep in mind that your web service probably will not scale as well as the Spark job - a naive first-pass implementation could easily overwhelm many services, particularly if/when partitions need to be recalculated. There are a few mechanisms you can use to mitigate this - one is to use mapPartitions rather than map when transforming the set of requests to the set of results, initialize an HTTP connection for each partition, and transform the data that defines the request into your desired dataset by invoking the web service. Using mapPartitions allows you to limit the number of concurrent HTTP connections to one per partition (although this may be too slow if your service is slow... there is obviously a bit of analysis, testing and profiling that would need to be done on the entire job). Another consideration would be to look at persisting or caching the intermediate results after you've successfully retrieved your results from the service, to reduce the likelihood of hitting the web service more than necessary. 4. Just realized you might be looking for help invoking an HTTP service programmatically from Scala / Spark - if so, you might want to look at the spray-client http://spray.io/documentation/1.2.3/spray-client/ library. 5. With millions of web service requests, it's highly likely some will fail, for a variety of reasons. Look into using Scala's Try http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or Either http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads to encode success / failure, and treat failed requests as first-class citizens in your RDD of results (by retrying them, filtering them, logging them, etc., based on your specific needs and use case). Make sure you are setting reasonable timeouts on your service calls to prevent the jSpark ob from getting stuck if the service turns into a black hole. As I said above, I'm pretty new to Spark, so others may have some better advice, or even tell you to ignore mine completely (no hard feelings, I promise - this is all very new to me). Good luck! Regards, Will On Wed, Jun 3, 2015 at 3:49 AM, kasparfischer kaspar.fisc...@dreizak.com wrote: Hi everybody, I'm new to Spark, apologies if my question is very basic. I have a need to send millions of requests to a web service and analyse and store the responses in an RDD. I can easy express the analysing part using Spark's filter/map/etc. primitives but I don't know how to make the requests. Is that something I can do from within Spark? Or Spark Streaming? Or does it conflict with the way Spark works? I've found a similar question but am not sure whether the answer applies here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html Any clarifications or pointers would be super helpful! Thanks, Kaspar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Spark Cluster Benchmarking Frameworks
Hi Jonathan, Maybe you can try BigDataBench. http://prof.ict.ac.cn/BigDataBench/ http://prof.ict.ac.cn/BigDataBench/ . It provides lots of workloads, including both Hadoop and Spark based workloads. Zhen Jia hodgesz wrote Hi Spark Experts, I am curious what people are using to benchmark their Spark clusters. We are about to start a build (bare metal) vs buy (AWS/Google Cloud/Qubole) project to determine our Hadoop and Spark deployment selection. On the Hadoop side we will test live workloads as well as simulated ones with frameworks like TestDFSIO, TeraSort, MRBench, GridMix, etc. Do any equivalent benchmarking frameworks exist for Spark? A quick Google search yielded https://github.com/databricks/spark-perf which looks pretty interesting. It would be great to hear what others are doing here. Thanks for the help! Jonathan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cluster-Benchmarking-Frameworks-tp12699p23146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standard Scaler taking 1.5hrs
Can you do count() before fit to force materialize the RDD? I think something before fit is slow. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com wrote: The fit part is very slow, transform not at all. The number of partitions was 210 vs number of executors 80. Spark 1.4 sounds great but as my company is using Qubole we are dependent upon them to upgrade from version 1.3.1. Until that happens, can you think of any other reasons as to why it could be slow. Sparse vectors? Excessive number of columns? Sent from my mobile device. Please excuse any typos. On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.com javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com'); wrote: Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com'); wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala val parsedData = stack_sorted.mapPartitions( partition = partition.map{row = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at console:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some( org.apache.spark.HashPartitioner@d2) scala val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row = row.features)) scala val scaledData = parsedData.mapPartitions(partition = partition.map{row = LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala val numIterations = 100 scala val stepSize = 0.1 scala val miniBatchFraction = 0.1 scala val algorithm = new LinearRegressionWithSGD() scala algorithm.setIntercept(false) scala algorithm.optimizer.setNumIterations(numIterations) scala algorithm.optimizer.setStepSize(stepSize) scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242 M: 323.377.9197 www.marketshare.com http://www.marketsharepartners.com/ twitter.com/marketsharep -- - DB Sent from my iPhone
Re: Spark Client
Thanks Akhil, Richard, Oleg for your quick response . @Oleg we have actually tried the same thing but unfortunately when we throw exception Akka framework is catching all exceptions and thinking job failed and rerunning the spark jobs infinitely. Since in OneForOneStrategy in akka , max no of retries is set to infinite , is there any way to configure this value ? Or else is there any other way to solve this problem ? If we don't throw exception in checkExit() JVM will exit right ? Is there a way to stop JVM exit ? On Wed, Jun 3, 2015 at 9:01 PM, Oleg Zhurakousky oleg.zhurakou...@gmail.com wrote: I am not sure why Spark is relying on System.exit, hopefully someone will be able to provide a technical justification for it (very curious to hear it), but for your use case you can easily trap System.exit call before JVM exit with a simple implementation of SecurityManager and try/catch. Here are more details (extracted from some of the code I am using to deal with the same problem in Hadoop processes, so it's java but you'll get the point) 1. Create a simple implementation of Security Manager: public class SystemExitDisallowingSecurityManager extends SecurityManager { @Override public void checkPermission(Permission perm) { // allow everything } @Override public void checkPermission(Permission perm, Object context) { // allow everything } @Override public void checkExit(int status) { throw new SystemExitException(); } } 2. Create SystemExitExcepion public class SystemExitException extends RuntimeException { public SystemExitException() { } } 3. When instantiating your workflow engine register the aforementioned SecurityManager (e.g., in the constructor) System.setSecurityManager(new SystemExitDisallowingSecurityManager ()); 3.1. In your workflow engine invoke the process in try/catch block try { // invoke the spark-submit process } catch (SystemExitException e) { // log some message but allow to continue } When Spark triggers System.exit the SecurityManager will trigger SystemExitException which you simply ignore. Or you can even avoid triggering SystemExitException all together essentially ignoring all the calls to System.exit. Cheers Oleg On Wed, Jun 3, 2015 at 10:55 AM, Richard Marscher rmarsc...@localytics.com wrote: I think the short answer to the question is, no, there is no alternate API that will not use the System.exit calls. You can craft a workaround like is being suggested in this thread. For comparison, we are doing programmatic submission of applications in a long-running client application. To get around these issues we make a shadowed version of some of the Spark code in our application to remove the System.exit calls so instead exceptions bubble up to our application. On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try this? Create an sbt project like: // Create your context val sconf = new SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077) val sc = new SparkContext(sconf) // Do some computations sc.parallelize(1 to 1).take(10).foreach(println) //Now return the exit status System.exit(Some number) Now, make your workflow manager to trigger *sbt run* on the project instead of using spark-submit. Thanks Best Regards On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi akhil , sorry i may not conveying the question properly . Actually we are looking to Launch a spark job from a long running workflow manager, which invokes spark client via SparkSubmit. Unfortunately the client upon successful completion of the application exits with a System.exit(0) or System.exit(NON_ZERO) when there is a failure. Question is, Is there an alternate api though which a spark application can be launched which can return a exit status back to the caller as opposed to initiating JVM halt. On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Run it as a standalone application. Create an sbt project and do sbt run? Thanks Best Regards On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri pavan.kolam...@gmail.com wrote: Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs. But for my use case i don't want it to be exit with System.exit . Is there any other spark client which is api friendly other than SparkSubmit which shouldn't exit with system.exit. Please correct me if i am missing something. Thanks in advance -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri -- Regards Pavan Kumar Kolamuri
Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException
Hi, sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException. But I can access the table via sqlContext.sql(“select * from db.tbl”) So I know it has the table info from the metastore. Anyone else see this ? I’ll keep digging. I compiled via make-distribution -Pyarn -phadoop-2.4 -Phive -Phive-thriftserver It worked for me in 1.3.1 Cheers, Doug - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Equivalent to Storm's 'field grouping' in Spark.
This happens automatically when you use the byKey operations, e.g. reduceByKey, updateStateByKey, etc. Spark Streaming keeps the state for a given set of keys on a specific node and sends new tuples with that key to that. Matei On Jun 3, 2015, at 6:31 AM, allonsy luke1...@gmail.com wrote: Hi everybody, is there in Spark anything sharing the philosophy of Storm's field grouping? I'd like to manage data partitioning across the workers by sending tuples sharing the same key to the very same worker in the cluster, but I did not find any method to do that. Suggestions? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org