issue creating spark context with CDH 5.3.1
Hi, I am using CDH5.3.1 I am getting bellow error while, even spark context not getting created, I am submitting my job like this - submitting command- spark-submit --jars ./analiticlibs/utils-common-1.0.0.jar,./analiticlibs/mysql-connector-java-5.1.17.jar,./analiticlibs/log4j-1.2.17.jar,./analiticlibs/ant-launcher-1.9.1.jar,./analiticlibs/antlr-2.7.7.jar,./analiticlibs/antlr-runtime-3.4.jar,./analiticlibs/avro-1.7.6-cdh5.3.1.jar,./analiticlibs/datanucleus-api-jdo-3.2.6.jar,./analiticlibs/datanucleus-core-3.2.10.jar,./analiticlibs/datanucleus-rdbms-3.2.9.jar,./analiticlibs/derby-10.10.1.1.jar,./analiticlibs/hive-ant-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-contrib-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-exec-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-jdbc-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-metastore-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-service-0.13.1-cdh5.3.1.jar,./analiticlibs/libfb303-0.9.0.jar,./analiticlibs/libthrift-0.9.0-cdh5-2.jar,./analiticlibs/tachyon-0.5.0.jar,./analiticlibs/zookeeper.jar --master yarn --class mycom.java.analitics.SparkEngineTest sparkanalitics-1.0.0.jar even if I will not specify jar explicitly I am getting same exception, exception- Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf at org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30) at mycom.java.analitics.core.SparkAnaliticEngine.getJavaHiveContext(SparkAnaliticEngine.java:103) at mycom.java.analitics.core.SparkAnaliticTable.evmycomate(SparkAnaliticTable.java:106) at mycom.java.analitics.core.SparkAnaliticEngine.evmycomateAnaliticTable(SparkAnaliticEngine.java:55) at mycom.java.analitics.core.SparkAnaliticEngine.evmycomateAnaliticTable(SparkAnaliticEngine.java:65) at mycom.java.analitics.SparkEngineTest.main(SparkEngineTest.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 13 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-creating-spark-context-with-CDH-5-3-1-tp21968.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue creating spark context with CDH 5.3.1
I have copied hive-site.xml to spark conf folder cp /etc/hive/conf/hive-site.xml /usr/lib/spark/conf -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-creating-spark-context-with-CDH-5-3-1-tp21968p21969.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: issue creating spark context with CDH 5.3.1
This one is CDH-specific and is already answered in the forums, so I'd go there instead. Ex: http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-sql-and-Hive-tables/td-p/22051 On Mon, Mar 9, 2015 at 12:33 PM, sachin Singh sachin.sha...@gmail.com wrote: Hi, I am using CDH5.3.1 I am getting bellow error while, even spark context not getting created, I am submitting my job like this - submitting command- spark-submit --jars ./analiticlibs/utils-common-1.0.0.jar,./analiticlibs/mysql-connector-java-5.1.17.jar,./analiticlibs/log4j-1.2.17.jar,./analiticlibs/ant-launcher-1.9.1.jar,./analiticlibs/antlr-2.7.7.jar,./analiticlibs/antlr-runtime-3.4.jar,./analiticlibs/avro-1.7.6-cdh5.3.1.jar,./analiticlibs/datanucleus-api-jdo-3.2.6.jar,./analiticlibs/datanucleus-core-3.2.10.jar,./analiticlibs/datanucleus-rdbms-3.2.9.jar,./analiticlibs/derby-10.10.1.1.jar,./analiticlibs/hive-ant-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-contrib-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-exec-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-jdbc-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-metastore-0.13.1-cdh5.3.1.jar,./analiticlibs/hive-service-0.13.1-cdh5.3.1.jar,./analiticlibs/libfb303-0.9.0.jar,./analiticlibs/libthrift-0.9.0-cdh5-2.jar,./analiticlibs/tachyon-0.5.0.jar,./analiticlibs/zookeeper.jar --master yarn --class mycom.java.analitics.SparkEngineTest sparkanalitics-1.0.0.jar even if I will not specify jar explicitly I am getting same exception, exception- Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf at org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30) at mycom.java.analitics.core.SparkAnaliticEngine.getJavaHiveContext(SparkAnaliticEngine.java:103) at mycom.java.analitics.core.SparkAnaliticTable.evmycomate(SparkAnaliticTable.java:106) at mycom.java.analitics.core.SparkAnaliticEngine.evmycomateAnaliticTable(SparkAnaliticEngine.java:55) at mycom.java.analitics.core.SparkAnaliticEngine.evmycomateAnaliticTable(SparkAnaliticEngine.java:65) at mycom.java.analitics.SparkEngineTest.main(SparkEngineTest.java:29) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 13 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-creating-spark-context-with-CDH-5-3-1-tp21968.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Read Parquet file from scala directly
Hi All, I have a lot of parquet files, and I try to open them directly instead of load them into RDD in driver (so I can optimize some performance through special logic). But I do some research online and can't find any example to access parquet directly from scala, anyone has done this before? Regards, Shuai
Re: Solve least square problem of the form min norm(A x - b)^2^ + lambda * n * norm(x)^2 ?
Hi Jaonary, The RowPartitionedMatrix is a special case of the BlockMatrix, where the colsPerBlock = nCols. I hope that helps. Burak On Mar 6, 2015 9:13 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Shivaram, Thank you for the link. I'm trying to figure out how can I port this to mllib. May you can help me to understand how pieces fit together. Currently, in mllib there's different types of distributed matrix : BlockMatrix, CoordinateMatrix, IndexedRowMatrix and RowMatrix. Which one should correspond to RowPartitionedMatrix in ml-matrix ? On Tue, Mar 3, 2015 at 8:02 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: There are couple of solvers that I've written that is part of the AMPLab ml-matrix repo [1,2]. These aren't part of MLLib yet though and if you are interested in porting them I'd be happy to review it Thanks Shivaram [1] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/TSQR.scala [2] https://github.com/amplab/ml-matrix/blob/master/src/main/scala/edu/berkeley/cs/amplab/mlmatrix/NormalEquations.scala On Tue, Mar 3, 2015 at 9:01 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Is there a least square solver based on DistributedMatrix that we can use out of the box in the current (or the master) version of spark ? It seems that the only least square solver available in spark is private to recommender package. Cheers, Jao
Re: what are the types of tasks when running ALS iterations
+user On Mar 9, 2015 8:47 AM, Burak Yavuz brk...@gmail.com wrote: Hi, In the web UI, you don't see every single task. You see the name of the last task before the stage boundary (which is a shuffle like a groupByKey), which in your case is a flatMap. Therefore you only see flatMap in the UI. The groupByKey and the flatMap that follows form a single stage. Please take a look at http://www.slideshare.net/mobile/pwendell/tuning-and-debugging-in-apache-spark for further reference. Burak On Mar 8, 2015 11:44 PM, lisendong lisend...@163.com wrote: you see, the core of ALS 1.0.0 is the following code: there should be flatMap and groupByKey when running ALS iterations , right? but when I run als iteration, there are ONLY flatMap tasks... do you know why? private def updateFeatures( products: RDD[(Int, Array[Array[Double]])], productOutLinks: RDD[(Int, OutLinkBlock)], userInLinks: RDD[(Int, InLinkBlock)], partitioner: Partitioner, rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) = val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]]) for (p - 0 until outLinkBlock.elementIds.length; userBlock - 0 until numBlocks) { if (outLinkBlock.shouldSend(p)(userBlock)) { toSend(userBlock) += factors(p) } } toSend.zipWithIndex.map{ case (buf, idx) = (idx, (bid, buf.toArray)) } }.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的 als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew .join(userInLinks) .mapValues{ case (messages, inLinkBlock) = updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: failure to display logs on YARN UI with log aggregation on
See http://search-hadoop.com/m/JW1q5AneoE1 Cheers On Mon, Mar 9, 2015 at 7:29 AM, rok rokros...@gmail.com wrote: I'm using log aggregation on YARN with Spark and I am not able to see the logs through the YARN web UI after the application completes: Failed redirect for container_1425390894284_0066_01_01 Failed while trying to construct the redirect url to the log server. Log Server url may not be configured java.lang.Exception: Unknown container. Container either has not started or has already completed or doesn't belong to this node at all. I tried setting log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log in log4j.properties as suggested in the documentation, but didn't seem to change anything. Note that I can see the logs fine using yarn logs from the command line, so aggregation is working properly. For regular mapreduce jobs, the YARN UI displays the logs correctly as well. Is there a spark configuration option that needs to be set if aggregation is used? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/failure-to-display-logs-on-YARN-UI-with-log-aggregation-on-tp21974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to preserve/preset partition information when load time series data?
Hi All, If I have a set of time series data files, they are in parquet format and the data for each day are store in naming convention, but I will not know how many files for one day. 20150101a.parq 20150101b.parq 20150102a.parq 20150102b.parq 20150102c.parq . 201501010a.parq . Now I try to write a program to process the data. And I want to make sure each day's data into one partition, of course I can load all into one big RDD to do partition but it will be very slow. As I already know the time series of the file name, is it possible for me to load the data into the RDD also preserve the partition? I know I can preserve the partition by each file, but is it anyway for me to load the RDD and preserve partition based on a set of files: one partition multiple files? I think it is possible because when I load a RDD from 100 files (assume cross 100 days), I will have 100 partitions (if I disable file split when load file). Then I can use a special coalesce to repartition the RDD? But I don't know is it possible to do that in current Spark now? Regards, Shuai
distcp problems on ec2 standalone spark cluster
I got pass the issues with the cluster not started problem by adding Yarn to mapreduce.framework.name . But when I try to to distcp , if I use uRI with s3://path to my bucket .. I get invalid path even though the bucket exists. If I use s3n:// it just hangs. Did anyone else face anything like that ? I also noticed that this script puts the image of cloudera. hadoop. Does it matter? Thanks -R
saveAsTextFile extremely slow near finish
I'm basically running a sorting using spark. The spark program will read from HDFS, sort on composite keys, and then save the partitioned result back to HDFS. pseudo code is like this: input = sc.textFile pairs = input.mapToPair sorted = pairs.sortByKey values = sorted.values values.saveAsTextFile Input size is ~ 160G, and I made 1000 partitions specified in JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress and the last few jobs just took forever and never finishes. Cluster setup: 8 nodes on each node: 15gb memory, 8 cores running parameters: --executor-memory 12G --conf spark.cores.max=60 Thank you for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.RuntimeException: Couldn't find function Some
Hi, In my spark application I queried a hive table and tried to take only one record, but got java.lang.RuntimeException: Couldn't find function Some val rddCoOrd = sql(SELECT date, x, y FROM coordinate where order by date limit 1) valresultCoOrd = rddCoOrd.take(1)(0) Any ideas? I tested the same code on spark shell, it worked. Best, Patcharee
GraphX Snapshot Partitioning
Hello, I am working on a project where we want to split graphs of data into snapshots across partitions and I was wondering what would happen if one of the snapshots we had was too large to fit into a single partition. Would the snapshot be split over the two partitions equally, for example, and how is a single snapshot spread over multiple partitions? Thank You, Matthew Bucci -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Snapshot-Partitioning-tp21977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Top, takeOrdered, sortByKey
From: Saba Sehrish ssehr...@fnal.govmailto:ssehr...@fnal.gov Date: March 9, 2015 at 4:11:07 PM CDT To: user-...@spark.apache.orgmailto:user-...@spark.apache.org Subject: Using top, takeOrdered, sortByKey I am using spark for a template matching problem. We have 77 million events in the template library, and we compare energy of each of the input event with the each of the template event and return a score. In the end we return best 1 matches with lowest score. A score of 0 is a perfect match. I down sampled the problem to use only 50k events. For a single event matching across all the events in the template (50k) I see 150-200ms for score calculation on 25 cores (using YARN cluster), but after that when I perform either a top or takeOrdered or even sortByKey the time reaches to 25-50s. So far I am not able to figure out why such a huge gap going from a list of scores to a list of top 1000 scores and why sorting or getting best X matches is being dominant by a large factor. One thing I have noticed is that it doesn’t matter how many elements I return the time range is the same 25-50s for 10 - 1 elements. Any suggestions? if I am not using API properly? scores is JavaPairRDDInteger, Double, and I do something like numbestmatches is 10, 100, 1 or any number. List Tuple2Integer, Double bestscores_list = scores.takeOrdered(numbestmatches, new TupleComparator()); Or List Tuple2Integer, Double bestscores_list = scores.top(numbestmatches, new TupleComparator()); Or List Tuple2Integer, Double bestscores_list = scores.sortByKey();
RE: sc.textFile() on windows cannot access UNC path
This is a Java problem, not really Spark. From this page: http://stackoverflow.com/questions/18520972/converting-java-file-url-to-file-path-platform-independent-including-u You can see that using Java.nio.* on JDK 7, it will fix this issue. But Path class in Hadoop will use java.io.*, instead of java.nio. You need to manually mount your windows remote share a local driver, like Z:, then it should work. Yong From: ningjun.w...@lexisnexis.com To: user@spark.apache.org Subject: sc.textFile() on windows cannot access UNC path Date: Mon, 9 Mar 2015 21:09:38 + I am running Spark on windows 2008 R2. I use sc.textFile() to load text file using UNC path, it does not work. sc.textFile(rawfile:10.196.119.230/folder1/abc.txt, 4).count() Input path does not exist: file:/10.196.119.230/folder1/abc.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/10.196.119.230/tar/Enron/enron-207-short.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328) at org.apache.spark.rdd.RDD.count(RDD.scala:910) at ltn.analytics.tests.IndexTest$$anonfun$3.apply$mcV$sp(IndexTest.scala:104) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at ltn.analytics.tests.IndexTest.org$scalatest$BeforeAndAfterAll$$super$run(IndexTest.scala:15) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at
error on training with logistic regression sgd
Hi, I was launching a spark cluster with 4 work nodes, each work nodes contains 8 cores and 56gb ram, and I was testing my logistic regression problem. The training set is around 1.2 million records.When I was using 2**10 (1024) features, the whole program works fine, but when I use 2**14 features, the program has encountered the error: Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at
yarn + spark deployment issues (high memory consumption and task hung)
Yarn+ Spark: I am running my spark job (on yarn) on 6 data node cluster of 512GB each. I was having tough time configuring it since the job would hang in one or more tasks on any of the executors for indefinite time. The stage can be as simple as rdd count. And the bottleneck point is not always the same. So there must be something goofy in my configuration which might be causing the deadlock in any of the stages. I do multiple transformation on the input rdd, and I see the following log message where it consumed ~36GB in less than 1 hour itself. After 2-3 hour run, executor runs OOM, container gets skilled and a new one gets created which continues to work fine till issue repeats. I configured executor failures to a high number, so the application never fails. 2015-03-09 14:11:17,261 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 1800 for container-id container_1425683313223_0026_01_02: 35.7 GB of 85 GB physical memory used; 91.3 GB of 178.5 GB virtual memory used ./spark-submit --conf spark.storage.memoryFraction=0.6 --conf spark.eventLog.overwrite=true --conf spark.driver.maxResultSize=5g --conf spark.yarn.executor.memoryOverhead=5120 --conf spark.akka.frameSize=512 --conf spark.eventLog.enabled=true --master yarn-cluster --num-executors 6 --executor-memory 80G --driver-memory 40G --executor-cores 20 --class /tmp/main-all.jar Here are the questions which can help me great deal: 1 Is it common for executors to get filled up so fast, I mean I am not explicitly doing RDD.persist or unpersist. I had tried to do so in the past, but didn't yield me anything. Is it common for containers to get killed and new one get spawned in a spark job run? 2Whenever a stage is hung processing the task, on the Yarn+Spark UI, sometimes I see CANNOT FIND ADDRESS on the executor column, or sometimes executor is mentioned, but the task size is 0, and all the task on the executor remain in running state. How can we debug this? having trace enabled also didn't yield any good evidence of what is going wrong. 3 I read about RDD cleanup process, but still don't completely understand how do these RDD get purged out on its own. I set memory fraction to 0.6 which is quite substantial, but RDD size itself may vary depending upon their content. I would not need an RDD once I complete all transformation, how can I make sure it gets purged and my executors don't run into OOM situation? Thank you, Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/yarn-spark-deployment-issues-high-memory-consumption-and-task-hung-tp21980.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sc.textFile() on windows cannot access UNC path
I am running Spark on windows 2008 R2. I use sc.textFile() to load text file using UNC path, it does not work. sc.textFile(rawfile:10.196.119.230/folder1/abc.txt, 4).count() Input path does not exist: file:/10.196.119.230/folder1/abc.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/10.196.119.230/tar/Enron/enron-207-short.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328) at org.apache.spark.rdd.RDD.count(RDD.scala:910) at ltn.analytics.tests.IndexTest$$anonfun$3.apply$mcV$sp(IndexTest.scala:104) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at ltn.analytics.tests.IndexTest.org$scalatest$BeforeAndAfterAll$$super$run(IndexTest.scala:15) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at ltn.analytics.tests.IndexTest.run(IndexTest.scala:15) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at
Re: MLlib/kmeans newbie question(s)
You need to change `== 1` to `== i`. `println(t)` happens on the workers, which may not be what you want. Try the following: noSets.filter(t = model.predict(Utils.featurize(t)) == i).collect().foreach(println) -Xiangrui On Sat, Mar 7, 2015 at 3:20 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: Hi all, I'm very new to machine learning algorithms and Spark. I'm follow the Twitter Streaming Language Classifier found here: http://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/README.html Specifically this code: http://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/scala/src/main/scala/com/databricks/apps/twitter_classifier/ExamineAndTrain.scala Except I'm trying to run it in batch mode on some tweets it pulls out of Cassandra, in this case 200 total tweets. As the example shows, I am using this object for vectorizing a set of tweets: object Utils{ val numFeatures = 1000 val tf = new HashingTF(numFeatures) /** * Create feature vectors by turning each tweet into bigrams of * characters (an n-gram model) and then hashing those to a * length-1000 feature vector that we can pass to MLlib. * This is a common way to decrease the number of features in a * model while still getting excellent accuracy (otherwise every * pair of Unicode characters would potentially be a feature). */ def featurize(s: String): Vector = { tf.transform(s.sliding(2).toSeq) } } Here is my code which is modified from ExaminAndTrain.scala: val noSets = rawTweets.map(set = set.mkString(\n)) val vectors = noSets.map(Utils.featurize).cache() vectors.count() val numClusters = 5 val numIterations = 30 val model = KMeans.train(vectors, numClusters, numIterations) for (i - 0 until numClusters) { println(s\nCLUSTER $i) noSets.foreach { t = if (model.predict(Utils.featurize(t)) == 1) { println(t) } } } This code runs and each Cluster prints Cluster 0 Cluster 1 etc with nothing printing beneath. If i flip models.predict(Utils.featurize(t)) == 1 to models.predict(Utils.featurize(t)) == 0 the same thing happens except every tweet is printed beneath every cluster. Here is what I intuitively think is happening (please correct my thinking if its wrong): This code turns each tweet into a vector, randomly picks some clusters, then runs kmeans to group the tweets (at a really high level, the clusters, i assume, would be common topics). As such, when it checks each tweet to see if models.predict == 1, different sets of tweets should appear under each cluster (and because its checking the training set against itself, every tweet should be in a cluster). Why isn't it doing this? Either my understanding of what kmeans does is wrong, my training set is too small or I'm missing a step. Any help is greatly appreciated - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming input data source list
Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?… I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
Re: Can't cache RDD of collaborative filtering on MLlib
cache() is lazy. The data is stored into memory after the first time it gets materialized. So the first time you call `predict` after you load the model back from HDFS, it still takes time to load the actual data. The second time will be much faster. Or you can call `userJavaRDD.count()` and `productJavaRDD.count()` explicitly to load both into memory before you create the model. -Xiangrui On Sun, Mar 8, 2015 at 9:43 AM, Yuichiro Sakamoto ks...@muc.biglobe.ne.jp wrote: Hello. I create program, collaborative filtering using Spark, but I have trouble with calculating speed. I want to implement recommendation program using ALS (MLlib), which is another process from Spark. But access speed of MatrixFactorizationModel object on HDFS is slow, so I want to cache it, but I can't. There are 2 processes: process A: 1. Create MatrixFactorizationModel by ALS 2. Save following objects to HDFS - MatrixFactorizationModel (on RDD) - MatrixFactorizationModel#userFeatures(RDD) - MatrixFactorizationModel#productFeatures(RDD) process B: 1. Load model information saved by process A. # In process B, Master of SparkContext is set to local == // Read Model JavaRDDMatrixFactorizationModel modelRDD = sparkContext.objectFile(HDFS path); MatrixFactorizationModel preModel = modelData.first(); // Read Model's RDD JavaRDDTuple2lt;Object, double[] productJavaRDD = sparkContext.objectFile(HDFS path); JavaRDDTuple2lt;Object, double[] userJavaRDD = sparkContext.objectFile(HDFS path); // Create Model MatrixFactorizationModel model = new MatrixFactorizationModel(preModel.rank(), JavaRDD.toRDD(userJavaRDD), JavaRDD.toRDD(productJavaRDD)); == 2. Call predict method of above MatrixFactorizationModel object. At number 2 of process B, it is slow speed because objects are read from HDFS every time. # I confirmed that the result of recommendation is correct. So, I tried to cache productJavaRDD and userJavaRDD as following, but there was no response from predict method. == // Read Model JavaRDDMatrixFactorizationModel modelRDD = sparkContext.objectFile(HDFS path); MatrixFactorizationModel preModel = modelData.first(); // Read Model's RDD JavaRDDTuple2lt;Object, double[] productJavaRDD = sparkContext.objectFile(HDFS path); JavaRDDTuple2lt;Object, double[] userJavaRDD = sparkContext.objectFile(HDFS path); // Cache productJavaRDD.cache(); userJavaRDD.cache(); // Create Model MatrixFactorizationModel model = new MatrixFactorizationModel(preModel.rank(), JavaRDD.toRDD(userJavaRDD), JavaRDD.toRDD(productJavaRDD)); == I could not understand why predict method was frozen. Could you please help me how to cache object ? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-cache-RDD-of-collaborative-filtering-on-MLlib-tp21962.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
From Spark web ui, how to prove the parquet column pruning working
Hi, Currently most of the data in our production is using Avro + Snappy. I want to test the benefits if we store the data in Parquet format. I changed the our ETL to generate the Parquet format, instead of Avor, and want to test a simple sql in Spark SQL, to verify the benefits from Parquet. I generated the same dataset in both Avro and Parquet in HDFS, and load them both in Spark-SQL. Now I run the same query like select colum1 from src_table_avro/parqut where colum2=xxx, I can see that for the parquet data format, the job runs much fast. The test files size for both format are around 930M. So Avro job generated 8 tasks to read the data with 21s as the median duration, vs parquet job generate 7 tasks to read the data with 0.4s as the median duration. Since the dataset has more than 100 columns, I can see the parquet file really coming with fast read. But my question is that from the spark UI, both job show 900M as the input size, and 0 for rest, in this case, how do I know column pruning really works? I think it is due to that, so parquet file can be read so fast, but is there any statistic can prove that to me on the Spark UI? Something like the input total file size is 900M, but only 10M really read due to column pruning? So in case that the columns pruning not work in parquet due to what kind of SQL query, I can identify in the first place. Thanks Yong
Joining data using Latitude, Longitude
Hi, I am trying to join data based on the latitude and longitude. I have reference data which has city information with their latitude and longitude. I have a data source with user information with their latitude and longitude. I want to find the nearest city to the user's latitude and longitude. I had initially planned to use $near operator in mongo in the map function for every user but it does not scale well. Is there any other option? Can I join the data in two files without loading to mongo? Thanks Ankur
Spark Streaming input data source list
Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?… I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
Re: Process time series RDD after sortByKey
Does the code flow similar to following work for you, which processes each partition of an RDD sequentially? while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } You can refer RDD.take for example. Thanks. Zhan Zhang On Mar 9, 2015, at 3:41 PM, Shuai Zheng szheng.c...@gmail.commailto:szheng.c...@gmail.com wrote: Hi All, I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data. I need to sort the data before I start process. Assume I can sort them successfully dayRDD.sortByKey but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can’t just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index? Anyway to do it? Hope I describe my issue clear… :) Regards, Shuai
Process time series RDD after sortByKey
Hi All, I am processing some time series data. For one day, it might has 500GB, then for each hour, it is around 20GB data. I need to sort the data before I start process. Assume I can sort them successfully dayRDD.sortByKey but after that, I might have thousands of partitions (to make the sort successfully), might be 1000 partitions. And then I try to process the data by hour (not need exactly one hour, but some kind of similar time frame). And I can't just re-partition size to 24 because then one partition might be too big to fit into memory (if it is 20GB). So is there any way for me to just can process underlying partitions by certain order? Basically I want to call mapPartitionsWithIndex with a range of index? Anyway to do it? Hope I describe my issue clear. J Regards, Shuai
sparse vector operations in Python
Hi, Sorry to ask this, but how do I compute the sum of 2 (or more) mllib SparseVectors in Python? Thanks, Ron - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there any problem in having a long opened connection to spark sql thrift server
I have some applications developed using PHP and currently we have a problem in connecting these applications to spark sql thrift server. ( Here is the problem I am talking about. http://apache-spark-user-list.1001560.n3.nabble.com/Connection-PHP-application-to-Spark-Sql-thrift-server-td21925.html ) Until I find a solution to this problem, there is a suggestion to make a little java application that connects to spark sql thrift server and provide an API to the PHP applications to executes the required queries. From my little experience, opening a connection and closing it for each query is not recommended (I am talking from my experience in working with CRUD applications the deals with some kind of database). 1- Is the same recommendation applied to working with spark sql thrift server ? 2- If yes, Is there any problem in having one connection connected for a long time with the spark sql thrift server? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-in-having-a-long-opened-connection-to-spark-sql-thrift-server-tp21967.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Optimizing SQL Query
Please fine the query plan scala sqlContext.sql(SELECT dw.DAY_OF_WEEK, dw.HOUR, avg(dw.SDP_USAGE) AS AVG_SDP_USAGE FROM (SELECT sdp.WID, DAY_OF_WEEK, HOUR, SUM(INTERVAL_VALUE) AS SDP_USAGE FROM (SELECT * FROM date_d AS dd JOIN interval_f AS intf ON intf.DATE_WID = dd.WID WHERE intf.DATE_WID = 20150101 AND intf.DATE_WID = 20150110 AND CAST(INTERVAL_END_TIME AS STRING) = '2015-01-01 00:00:00.000' AND CAST(INTERVAL_END_TIME AS STRING) = '2015-01-10 00:00:00.000' AND MEAS_WID = 3) AS test JOIN sdp_d AS sdp on test.SDP_WID = sdp.WID where sdp.UDC_ID = 'SP-168451834' group by sdp.WID, DAY_OF_WEEK, HOUR) AS dw group by dw.DAY_OF_WEEK, dw.HOUR) q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[36] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == Aggregate false, [DAY_OF_WEEK#3,HOUR#43L], [DAY_OF_WEEK#3,HOUR#43L,(CAST(SUM(PartialSum#133), DoubleType) / CAST(SUM(PartialCount#134L), DoubleType)) AS AVG_SDP_USAGE#126] Exchange (HashPartitioning [DAY_OF_WEEK#3,HOUR#43L], 200) Aggregate true, [DAY_OF_WEEK#3,HOUR#43L], [DAY_OF_WEEK#3,HOUR#43L,COUNT(SDP_USAGE#130) AS PartialCount#134L,SUM(SDP_USAGE#130) AS PartialSum#133] Project [DAY_OF_WEEK#3,HOUR#43L,SDP_USAGE#130] Aggregate false, [WID#49,DAY_OF_WEEK#3,HOUR#43L], [WID#49,DAY_OF_WEEK#3,HOUR#43L,SUM(PartialSum#136) AS SDP_USAGE#130] Exchange (HashPartitioning [WID#49,DAY_OF_WEEK#3,HOUR#43L], 200) Aggregate true, [WID#49,DAY_OF_WEEK#3,HOUR#43L], [...
Re: How to use the TF-IDF model?
Hi, well, it really depends on what you want to do ;) TF-IDF is a measure that originates in the information retrieval context and that can be used to judge the relevancy of a document in context of a given search term. It's also often used for text-related machine learning tasks. E.g. have a look at topic extraction using non-negative matrix factorization. Regards, Jeff 2015-03-09 7:39 GMT+01:00 Xi Shen davidshe...@gmail.com: Hi, I read this page, http://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html. But I am wondering, how to use this TF-IDF RDD? What is this TF-IDF vector looks like? Can someone provide me some guide? Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Top rows per group
I do have a schemaRDD where I want to group by a given field F1, but want the result to be not a single row per group but multiple rows per group where only the rows that have the N top F2 field values are kept. The issue is that the groupBy operation is an aggregation of multiple rows to a single one. Any suggestion or hint will be appreciated. Best, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Top-rows-per-group-tp21983.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark History server default conf values
Hi All, What are the default values for the following conf properities if we don't set in the conf file? # spark.history.fs.updateInterval 10 # spark.history.retainedApplications 500 Regards, Srini.
Re: GraphX Snapshot Partitioning
Hi, Vertices are simply hash-paritioned by their 64-bit IDs, so they are evenly spread over parititons. As for edges, GraphLoader#edgeList builds edge paritions through hadoopFile(), so the initial parititons depend on InputFormat#getSplits implementations (e.g, partitions are mostly equal to 64MB blocks for HDFS). Edges can be re-partitioned by ParititonStrategy; a graph is partitioned considering graph structures and a source ID and a destination ID are used as partition keys. The partitions might suffer from skewness depending on graph properties (hub nodes, or something). Thanks, takeshi On Tue, Mar 10, 2015 at 2:21 AM, Matthew Bucci mrbucci...@gmail.com wrote: Hello, I am working on a project where we want to split graphs of data into snapshots across partitions and I was wondering what would happen if one of the snapshots we had was too large to fit into a single partition. Would the snapshot be split over the two partitions equally, for example, and how is a single snapshot spread over multiple partitions? Thank You, Matthew Bucci -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Snapshot-Partitioning-tp21977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- --- Takeshi Yamamuro
Re: Spark Streaming input data source list
Link to custom receiver guide https://spark.apache.org/docs/latest/streaming-custom-receivers.html On Mon, Mar 9, 2015 at 5:55 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Lin, AFAIK, currently there’s no built-in receiver API for RDBMs, but you can customize your own receiver to get data from RDBMs, for the details you can refer to the docs. Thanks Jerry *From:* Cui Lin [mailto:cui@hds.com] *Sent:* Tuesday, March 10, 2015 8:36 AM *To:* Tathagata Das *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming input data source list Tathagata, Thanks for your quick response. The link is helpful to me. Do you know any API for streaming data from RMDB ? Best regards, Cui Lin *From: *Tathagata Das t...@databricks.com *Date: *Monday, March 9, 2015 at 11:28 AM *To: *Cui Lin cui@hds.com *Cc: *user@spark.apache.org user@spark.apache.org *Subject: *Re: Spark Streaming input data source list Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?… I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
RE: Spark Streaming input data source list
Hi Lin, AFAIK, currently there's no built-in receiver API for RDBMs, but you can customize your own receiver to get data from RDBMs, for the details you can refer to the docs. Thanks Jerry From: Cui Lin [mailto:cui@hds.com] Sent: Tuesday, March 10, 2015 8:36 AM To: Tathagata Das Cc: user@spark.apache.org Subject: Re: Spark Streaming input data source list Tathagata, Thanks for your quick response. The link is helpful to me. Do you know any API for streaming data from RMDB ? Best regards, Cui Lin From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Monday, March 9, 2015 at 11:28 AM To: Cui Lin cui@hds.commailto:cui@hds.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming input data source list Spark Streaming has StreamingContext.socketStream() http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String, int, scala.Function1, org.apache.spark.storage.StorageLevel, scala.reflect.ClassTag) TD On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin cui@hds.commailto:cui@hds.com wrote: Dear all, Could you send me a list for input data source that spark streaming could support? My list is HDFS, Kafka, textfile?... I am wondering if spark streaming could directly read data from certain port (443 e.g.) that my devices directly send to? Best regards, Cui Lin
RE: sc.textFile() on windows cannot access UNC path
Hi Yong Thanks for the reply. Yes it works with local drive letter. But I really need to use UNC path because the path is input from at runtime. I cannot dynamically assign a drive letter to arbitrary UNC path at runtime. Is there any work around that I can use UNC path for sc.textFile(...)? Ningjun From: java8964 [mailto:java8...@hotmail.com] Sent: Monday, March 09, 2015 5:33 PM To: Wang, Ningjun (LNG-NPV); user@spark.apache.org Subject: RE: sc.textFile() on windows cannot access UNC path This is a Java problem, not really Spark. From this page: http://stackoverflow.com/questions/18520972/converting-java-file-url-to-file-path-platform-independent-including-u You can see that using Java.nio.* on JDK 7, it will fix this issue. But Path class in Hadoop will use java.io.*, instead of java.nio. You need to manually mount your windows remote share a local driver, like Z:, then it should work. Yong From: ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com To: user@spark.apache.orgmailto:user@spark.apache.org Subject: sc.textFile() on windows cannot access UNC path Date: Mon, 9 Mar 2015 21:09:38 + I am running Spark on windows 2008 R2. I use sc.textFile() to load text file using UNC path, it does not work. sc.textFile(rawfile:10.196.119.230/folder1/abc.txtfile:///\\10.196.119.230\folder1\abc.txt, 4).count() Input path does not exist: file:/10.196.119.230/folder1/abc.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/10.196.119.230/tar/Enron/enron-207-short.load at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328) at org.apache.spark.rdd.RDD.count(RDD.scala:910) at ltn.analytics.tests.IndexTest$$anonfun$3.apply$mcV$sp(IndexTest.scala:104) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at ltn.analytics.tests.IndexTest$$anonfun$3.apply(IndexTest.scala:103) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at
RE: A strange problem in spark sql join
No, I don’t have tow master instances. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: 2015年3月9日 15:03 To: Dai, Kevin Cc: user@spark.apache.org Subject: Re: A strange problem in spark sql join Make sure you don't have two master instances running on the same machine. It could happen like you were running the job and in the middle you tried to stop the cluster which didn't completely stopped it and you did a start-all again which will eventually end up having 2 master instances running, and the former one will still be having your data computed/cached somewhere in the memory. Thanks Best Regards On Mon, Mar 9, 2015 at 11:45 AM, Dai, Kevin yun...@ebay.commailto:yun...@ebay.com wrote: Hi, guys I encounter a strange problem as follows: I joined two tables(which are both parquet files) and then did the groupby. The groupby took 19 hours to finish. However, when I kill this job twice in the groupby stage. The third try will su But after I killed this job and run it again. It succeeded and finished in 15mins. What’s wrong with it? Best Regards, Kevin.
How to use the TF-IDF model?
Hi, I read this page, http://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html. But I am wondering, how to use this TF-IDF RDD? What is this TF-IDF vector looks like? Can someone provide me some guide? Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
A strange problem in spark sql join
Hi, guys I encounter a strange problem as follows: I joined two tables(which are both parquet files) and then did the groupby. The groupby took 19 hours to finish. However, when I kill this job twice in the groupby stage. The third try will su But after I killed this job and run it again. It succeeded and finished in 15mins. What's wrong with it? Best Regards, Kevin.
How to load my ML model?
Hi, I used the method on this http://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/train.html passage to save my k-means model. But now, I have no idea how to load it back...I tried sc.objectFile(/path/to/data/file/directory/) But I got this error: org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:997) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:14 17) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1339) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:1339) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:993) ... 12 more Any suggestions? Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: A strange problem in spark sql join
Make sure you don't have two master instances running on the same machine. It could happen like you were running the job and in the middle you tried to stop the cluster which didn't completely stopped it and you did a start-all again which will eventually end up having 2 master instances running, and the former one will still be having your data computed/cached somewhere in the memory. Thanks Best Regards On Mon, Mar 9, 2015 at 11:45 AM, Dai, Kevin yun...@ebay.com wrote: Hi, guys I encounter a strange problem as follows: I joined two tables(which are both parquet files) and then did the groupby. The groupby took 19 hours to finish. However, when I kill this job twice in the groupby stage. The third try will su But after I killed this job and run it again. It succeeded and finished in 15mins. What’s wrong with it? Best Regards, Kevin.
Re: No executors allocated on yarn with latest master branch
You would have needed to configure it by setting yarn.scheduler.capacity.resource-calculator to something ending in DominantResourceCalculator. If you haven't configured it, there's a high probability that the recently committed https://issues.apache.org/jira/browse/SPARK-6050 will fix your problem. On Wed, Feb 25, 2015 at 1:36 AM, Anders Arpteg arp...@spotify.com wrote: We're using the capacity scheduler, to the best of my knowledge. Unsure if multi resource scheduling is used, but if you know of an easy way to figure that out, then let me know. Thanks, Anders On Sat, Feb 21, 2015 at 12:05 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you using the capacity scheduler or fifo scheduler without multi resource scheduling by any chance? On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote: The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at
what are the types of tasks when running ALS iterations
you see, the core of ALS 1.0.0 is the following code: there should be flatMap and groupByKey when running ALS iterations , right? but when I run als iteration, there are ONLY flatMap tasks... do you know why? private def updateFeatures( products: RDD[(Int, Array[Array[Double]])], productOutLinks: RDD[(Int, OutLinkBlock)], userInLinks: RDD[(Int, InLinkBlock)], partitioner: Partitioner, rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size productOutLinks.join(products).flatMap { case (bid, (outLinkBlock, factors)) = val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]]) for (p - 0 until outLinkBlock.elementIds.length; userBlock - 0 until numBlocks) { if (outLinkBlock.shouldSend(p)(userBlock)) { toSend(userBlock) += factors(p) } } toSend.zipWithIndex.map{ case (buf, idx) = (idx, (bid, buf.toArray)) } }.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的 als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew .join(userInLinks) .mapValues{ case (messages, inLinkBlock) = updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: A way to share RDD directly using Tachyon?
Did you try something like: myRDD.saveAsObjectFile(tachyon://localhost:19998/Y) val newRDD = sc.objectFile[MyObject](tachyon://localhost:19998/Y) Thanks Best Regards On Sun, Mar 8, 2015 at 3:59 PM, Yijie Shen henry.yijies...@gmail.com wrote: Hi, I would like to share a RDD in several Spark Applications, i.e, create one in application A, publish the ID somewhere and get the RDD back directly using ID in Application B. I know I can use Tachyon just as a filesystem and s.saveAsTextFile(tachyon://localhost:19998/Y”) like this. But get a RDD directly from tachyon instead of a file can sometimes avoid parsing the same file repeatedly in different Apps, I think. What am I supposed to do in order to share RDDs to get a better performance? — Best Regards! Yijie Shen
Ensuring data locality when opening files
Hi, We wrote a spark steaming app that receives file names on HDFS from Kafka and opens them using Hadoop's libraries. The problem with this method is that I'm not utilizing data locality because any worker might open any file without giving precedence to data locality. I can't open the files using sparkContext because it's limited to the driver class. Is there a way I could open files at runtime and benefit from data locality? Thanks, Daniel
How to build Spark and run examples using Intellij ?
Hi, I am trying to run examples of spark(master branch from git) from Intellij(14.0.2) but facing errors. These are the steps I followed: 1. git clone the master branch of apache spark.2. Build it using mvn -DskipTests clean install3. In Intellij select Import Projects and choose the POM.xml of spark root folder(Auto Import enabled)4. Then I tried to run SparkPi program but getting the following errors Information:9/3/15 3:46 PM - Compilation completed with 44 errors and 0 warnings in 5 sec usr/local/spark-1.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scalaError:(314, 109) polymorphic expression cannot be instantiated to expected type; found : [T(in method apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)] required: org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method functionToUdfBuilder)] implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func) I am able to run examples of this built version of spark from terminal using ./bin/run-example script. Could someone please help me in this issue? Thanks Regards, Meethu M