Re: Database operations on executor nodes
Totally depends on your database, if that's a NoSQL database like MongoDB/HBase etc then you can use the native .saveAsNewAPIHAdoopFile or .saveAsHadoopDataSet etc. For a SQL databases, i think people usually puts the overhead on driver like you did. Thanks Best Regards On Wed, Mar 18, 2015 at 10:52 PM, Praveen Balaji prav...@soundhound.com wrote: I was wondering what people generally do about doing database operations from executor nodes. I’m (at least for now) avoiding doing database updates from executor nodes to avoid proliferation of database connections on the cluster. The general pattern I adopt is to collect queries (or tuples) on the executors and write to the database on the driver. // Executes on the executor rdd.foreach(s = { val query = sinsert into ${s}; accumulator += query; }); // Executes on the driver acclumulator.value.foreach(query = { // get connection // update database }); I’m obviously trading database connections for driver heap. How do other spark users do it? Cheers Praveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
Thanks Khanderao. On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail khanderao.k...@gmail.com wrote: I have used various version of spark (1.0, 1.2.1) without any issues . Though I have not significantly used kafka with 1.3.0 , a preliminary testing revealed no issues . - khanderao On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote: Hi All, Which build of Spark is best when using Kafka? Regards jk
Error while Insert data into hive table via spark
Hi, I have configured apache spark 1.3.0 with hive 1.0.0 and hadoop 2.6.0. I am able to create table and retrive data from hive tables via following commands ,but not able insert data into table. scala sqlContext.sql(CREATE TABLE IF NOT EXISTS newtable (key INT)); scala sqlContext.sql(select * from newtable).collect; 15/03/19 02:10:20 INFO parse.ParseDriver: Parsing command: select * from newtable 15/03/19 02:10:20 INFO parse.ParseDriver: Parse Completed 15/03/19 02:10:35 INFO scheduler.DAGScheduler: Job 0 finished: collect at SparkPlan.scala:83, took 13.826402 s res2: Array[org.apache.spark.sql.Row] = Array([1]) But I am not able to insert data into this table via spark shell. This command runs perfectly fine from hive shell. scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@294fa094 // scala sqlContext.sql(INSERT INTO TABLE newtable SELECT 1); scala sqlContext.sql(INSERT INTO TABLE newtable values(1)); 15/03/19 02:03:14 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/03/19 02:03:14 INFO metastore.ObjectStore: ObjectStore, initialize called 15/03/19 02:03:14 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/03/19 02:03:14 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/03/19 02:03:14 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/03/19 02:03:15 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/03/19 02:03:16 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/03/19 02:03:18 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/03/19 02:03:18 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/03/19 02:03:18 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/03/19 02:03:18 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/03/19 02:03:18 INFO DataNucleus.Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/03/19 02:03:18 INFO metastore.ObjectStore: Initialized ObjectStore 15/03/19 02:03:19 INFO metastore.HiveMetaStore: Added admin role in metastore 15/03/19 02:03:19 INFO metastore.HiveMetaStore: Added public role in metastore 15/03/19 02:03:19 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty 15/03/19 02:03:20 INFO session.SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/03/19 02:03:20 INFO parse.ParseDriver: Parsing command: INSERT INTO TABLE newtable values(1) NoViableAltException(26@[]) at org.apache.hadoop.hive.ql.parse.HiveParser_SelectClauseParser.selectClause(HiveParser_SelectClauseParser.java:742) at org.apache.hadoop.hive.ql.parse.HiveParser.selectClause(HiveParser.java:40171) at org.apache.hadoop.hive.ql.parse.HiveParser.singleSelectStatement(HiveParser.java:38048) at org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:37754) at org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:37654) at org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:36898) at org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:36774) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1338) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1036) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:199) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:241) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
RE: configure number of cached partition in memory on SparkSQL
Thanks Cheng for replying. Meant to say to change number of partitions of a cached table. It doesn’t need to be re-adjusted after caching. To provide more context: What I am seeing on my dataset is that we have a large number of tasks. Since it appears each task is mapped to a partition, I want to see if matching partitions to available core count will make it faster. I’ll give your suggestion a try to see if it will help. Experiment is a great way to learn more about spark internals. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Monday, March 16, 2015 5:41 AM To: Judy Nash; user@spark.apache.org Subject: Re: configure number of cached partition in memory on SparkSQL Hi Judy, In the case of HadoopRDD and NewHadoopRDD, partition number is actually decided by the InputFormat used. And spark.sql.inMemoryColumnarStorage.batchSize is not related to partition number, it controls the in-memory columnar batch size within a single partition. Also, what do you mean by “change the number of partitions after caching the table”? Are you trying to re-cache an already cached table with a different partition number? Currently, I don’t see a super intuitive pure SQL way to set the partition number in this case. Maybe you can try this (assuming table t has a column s which is expected to be sorted): SET spark.sql.shuffle.partitions = 10; CACHE TABLE cached_t AS SELECT * FROM t ORDER BY s; In this way, we introduce a shuffle by sorting a column, and zoom in/out the partition number at the same time. This might not be the best way out there, but it’s the first one that jumped into my head. Cheng On 3/5/15 3:51 AM, Judy Nash wrote: Hi, I am tuning a hive dataset on Spark SQL deployed via thrift server. How can I change the number of partitions created by caching the table on thrift server? I have tried the following but still getting the same number of partitions after caching: Spark.default.parallelism spark.sql.inMemoryColumnarStorage.batchSize Thanks, Judy
MLlib Spam example gets stuck in Stage X
Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 *Error: *I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:==(1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su
Need some help on the Spark performance on Hadoop Yarn
Dear Spark experts, I appreciate you can look into my problem and give me some help and suggestions here... Thank you! I have a simple Spark application to parse and analyze the log, and I can run it on my hadoop yarn cluster. The problem with me is that I find it runs quite slow on the cluster, even slower than running it just on a single Spark machine. This is my application sketch: 1) read in the log file and use mapToPair to transform the raw logs to my Object - Tuple2String, LogEntry I use a string as key so later I will aggregate by the key 2) persist the RDD transformed from step 1 and let me call it logObjects 3) use aggregateByKey to to calculate the sum, avg value for each key. the reason I use aggregateByKey instead of reduce by key is the output Object is different 4) persist the RDD from step 3, let me call it aggregatedObjects. 5) run several takeOrdered to get top X values that I'm interested in What suprised me is that even with the persits (MEMORY_ONLY_SER) for two major RDDs I'm manipulating later, the process speed is not improved. It's even slower than not persist them... Any idea on that? I logged some date to the stdout and find the two major actions take more than 1 minutes. It's just 1GB log though... Another problem I'm seeing is it seems just use two of my DataNode in my Hadoop Yarn cluster, but actually I have three. Any configuration here that matters? I attached the syserr output here, please help me to analyze it and suggest where can I improve the speed. Thank you so much! (See attached file: applicationLog.txt) Best Regards Yi Ming Huang(黄毅铭) ICS Performance IBM Collaboration Solutions, China Development Lab, Shanghai huang...@cn.ibm.com (86-21)60922771 Addr: 5F, Building 10, No 399, Keyuan Road, Zhangjiang High Tech ParkSLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/tmp/hadoop-root/nm-local-dir/usercache/root/filecache/17/spark-assembly-1.2.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.4.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/03/19 21:13:46 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/03/19 21:13:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/19 21:13:47 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1426685857620_0005_01 15/03/19 21:13:48 INFO spark.SecurityManager: Changing view acls to: root 15/03/19 21:13:48 INFO spark.SecurityManager: Changing modify acls to: root 15/03/19 21:13:48 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/19 21:13:48 INFO yarn.ApplicationMaster: Starting the user JAR in a separate Thread 15/03/19 21:13:48 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/03/19 21:13:48 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 0 15/03/19 21:13:48 INFO spark.SecurityManager: Changing view acls to: root 15/03/19 21:13:48 INFO spark.SecurityManager: Changing modify acls to: root 15/03/19 21:13:48 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/19 21:13:48 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/03/19 21:13:48 INFO Remoting: Starting remoting 15/03/19 21:13:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@datanode03:42687] 15/03/19 21:13:48 INFO util.Utils: Successfully started service 'sparkDriver' on port 42687. 15/03/19 21:13:48 INFO spark.SparkEnv: Registering MapOutputTracker 15/03/19 21:13:48 INFO spark.SparkEnv: Registering BlockManagerMaster 15/03/19 21:13:48 INFO storage.DiskBlockManager: Created local directory at /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1426685857620_0005/spark-local-20150319211348-756f 15/03/19 21:13:48 INFO storage.MemoryStore: MemoryStore started with capacity 257.8 MB 15/03/19 21:13:48 INFO spark.HttpFileServer: HTTP File server directory is /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1426685857620_0005/container_1426685857620_0005_01_01/tmp/spark-8288d778-2bca-4afa-a805-cb3807c40f9f 15/03/19 21:13:48 INFO spark.HttpServer: Starting HTTP Server 15/03/19 21:13:49 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/19 21:13:49 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54693 15/03/19 21:13:49 INFO util.Utils: Successfully started service 'HTTP file server' on port 54693. 15/03/19 21:13:49 INFO ui.JettyUtils:
Re: MLlib Spam example gets stuck in Stage X
Can you see where exactly it is spending time? Like you said it goes to Stage 2, then you will be able to see how much time it spend on Stage 1. See if its a GC time, then try increasing the level of parallelism or repartition it like sc.getDefaultParallelism*3. Thanks Best Regards On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 *Error: *I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:==(1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su
Reliable method/tips to solve dependency issues?
Do people have a reliable/repeatable method for solving dependency issues or tips? The current world of spark-hadoop-hbase-parquet-... is very challenging given the huge footprint of dependent packages and we may be pushing against the limits of how many packages can be combined into one environment... The process of searching the web to pick at incompatibilities one at a time is at best tedious and at worst non-converging. It makes me wonder if there is (or ought to be) a page cataloging in one place the conflicts that Spark users have hit and what was done to solve it. Eugene Yokota wrote an interesting blog about current sbt dependency management in sbt v 0.13.7 that includes nice improvements for working with dependencies: https://typesafe.com/blog/improved-dependency-management-with-sbt-0137 After reading that, I refreshed on the sbt documentation and found show update. It gives very extensive information. For reference, there was an extensive discussion thread about sbt and maven last year that touches on a lot of topics: http://mail-archives.apache.org/mod_mbox/incubator-spark-dev/201402.mbox/%3ccabpqxsukhd4qsf5dg9ruhn7wvonxfm+y5b1k5d8g7h6s9bh...@mail.gmail.com%3E
Cloudant as Spark SQL External Datastore on Spark 1.3.0
Check this out : https://github.com/cloudant/spark-cloudant. It supports both the DataFrame and SQL approach for reading data from Cloudant and save it . Looking forward to your feedback on the project. Yang
FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded
I get 2 types of error - -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded Spar keeps re-trying to submit the code and keeps getting this error. My file on which I am finding the sliding window strings is 500 MB and I am doing it with length = 150. It woks fine till length is 100. This is my code - val hgfasta = sc.textFile(args(0)) // read the fasta file val kCount = hgfasta.flatMap(r = { r.sliding(args(2).toInt) }) val kmerCount = kCount.map(x = (x, 1)).reduceByKey(_ + _).map { case (x, y) = (y, x) }.sortByKey(false).map { case (i, j) = (j, i) } val filtered = kmerCount.filter(kv = kv._2 5) filtered.map(kv = kv._1 + , + kv._2.toLong).saveAsTextFile(args(1)) } It gets stuck and flat map and save as Text file Throws -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 and org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
Catching InvalidClassException in sc.objectFile
Hello, I have persisted a RDD[T] to disk through saveAsObjectFile. Then I changed the implementation of T. When I read the file with sc.objectFile using the new binary, I got the exception of java.io.InvalidClassException, which is expected. I try to catch this error via SparkException in the driver program. However, both getCause() and getSuppressed() are empty. What is the recommended way of catching this exception? Thanks. Justin
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Hi Akhil, Thank you for your help. I just found that the problem is related to my local spark application, since I ran it in IntelliJ and I didn't reload the project after I recompile the jar via maven. If I didn't reload, it will use some local cache data to run the application which leads to two different versions. After I reloaded the project and reran, it was running fine for v1.1.1 and I no longer saw that class incompatible issues. However, I now encounter a new issue starting from v1.2.0 and above. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, eason.hu); users with modify permissions: Set(hduser, eason.hu) 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started 15/03/19 01:10:18 INFO Remoting: Starting remoting 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@hduser-07:59122] 15/03/19 01:10:18 INFO Utils: Successfully started service 'driverPropsFetcher' on port 59122. 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://sparkDriver@192.168.1.53:65001]]. 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:144) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) ... 7 more Do you have any clues why it happens only after v1.2.0 and above? Nothing else changes. Thanks, Eason On Tue, Mar 17, 2015 at 8:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its clearly saying: java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 Version incompatibility, can you double check your version? On 18 Mar 2015 06:08, Eason Hu eas...@gmail.com wrote: Hi Akhil, sc.parallelize(1 to 1).collect() in the Spark shell on Spark v1.2.0 runs fine. However, if I do the following remotely, it will throw exception: val sc : SparkContext = new SparkContext(conf) val NUM_SAMPLES = 10 val count = sc.parallelize(1 to NUM_SAMPLES).map{i = val x = Math.random() val y = Math.random() if (x*x + y*y 1) 1 else 0 }.reduce(_ + _) println(Pi is roughly + 4.0 * count / NUM_SAMPLES) Exception: 15/03/17 17:33:52 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on hcompute32228.sjc9.service-now.com: remote Akka client disassociated 15/03/17 17:33:52 INFO scheduler.TaskSetManager: Re-queueing tasks
Timeout Issues from Spark 1.2.0+
Hi all, I'm trying to run the sample Spark application in version v1.2.0 and above. However, I've encountered a weird issue like below. This issue only be seen in v1.2.0 and above, but v1.1.0 and v1.1.1 are fine. The sample code: val sc : SparkContext = new SparkContext(conf) val NUM_SAMPLES = 10 val count = sc.parallelize(1 to NUM_SAMPLES).map{i = val x = Math.random() val y = Math.random() if (x*x + y*y 1) 1 else 0 }.reduce(_ + _) println(Pi is roughly + 4.0 * count / NUM_SAMPLES) The exception: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: hduser,eason.hu 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser, eason.hu); users with modify permissions: Set(hduser, eason.hu) 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started 15/03/19 01:10:18 INFO Remoting: Starting remoting 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@hduser-07:59122] 15/03/19 01:10:18 INFO Utils: Successfully started service 'driverPropsFetcher' on port 59122. 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://sparkDriver@192.168.1.53:65001]]. 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:144) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) ... 7 more Do you have any clues why it happens only after v1.2.0 and above? How to resolve this issue? Thank you very much, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Timeout-Issues-from-Spark-1-2-0-tp22150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib Spam example gets stuck in Stage X
Hi Akhil, 1) How could I see how much time it is spending on stage 1? Or what if, like above, it doesn't get past stage 1? 2) How could I check if its a GC time? and where would I increase the parallelism for the model? I have a Spark Master and 2 Workers running on CDH 5.3...what would the default spark-shell level of parallelism be...I thought it would be 3? Thank you for the help! -Su On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you see where exactly it is spending time? Like you said it goes to Stage 2, then you will be able to see how much time it spend on Stage 1. See if its a GC time, then try increasing the level of parallelism or repartition it like sc.getDefaultParallelism*3. Thanks Best Regards On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 *Error: *I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:==(1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su
how to specify multiple masters in sbin/start-slaves.sh script?
Hey guys, Not sure if i’m the only one got this. We are building high-available standalone spark env. We are using ZK with 3 masters in the cluster. However, in sbin/start-slaves.sh, it calls start-slave.sh for each member in conf/slaves file, and specify master using $SPARK_MASTER_IP and $SPARK_MASTER_PORT exec $sbin/slaves.sh cd $SPARK_HOME \; $sbin/start-slave.sh 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT But if I want to specify more than one master node, I have to use the format spark://host1:port1,host2:port2,host3:port3 spark://host1:port1,host2:port2,host3:port3 In this case, it seems the original sbin/start-slaves.sh can’t do the trick. Does everyone need to modify the script in order to build a HA cluster, or is there something I missed? Thanks
Re: SparkSQL 1.3.0 JDBC data source issues
JIRA and PR for first issue: https://issues.apache.org/jira/browse/SPARK-6408 https://github.com/apache/spark/pull/5087 On Thu, Mar 19, 2015 at 12:20 PM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am trying jdbc data source in spark sql 1.3.0 and found some issues. First, the syntax where str_col='value' will give error for both postgresql and mysql: psql create table foo(id int primary key,name text,age int); bash SPARK_CLASSPATH=postgresql-9.4-1201-jdbc41.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:postgresql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect org.postgresql.util.PSQLException: ERROR: operator does not exist: text = bar Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts. Position: 40 scala sql(select * from foo where name like '%foo').collect bash SPARK_CLASSPATH=mysql-connector-java-5.1.34.jar spark/bin/spark-shell scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) scala sql(select * from foo where name='bar').collect com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'bar' in 'where clause' Second, postgresql table with json data type does not work: psql create table foo(id int primary key, data json); scala sqlContext.load(jdbc,Map(url-jdbc:mysql://XXX,dbtable-foo)).registerTempTable(foo) java.sql.SQLException: Unsupported type Not sure these are bug in spark sql or jdbc. I can file JIRA ticket if needed. Thanks, -- Pei-Lun
Re: LZO configuration can not affect
How did you generate the Hadoop-lzo jar ? Thanks On Mar 17, 2015, at 2:36 AM, 唯我者 878223...@qq.com wrote: hi,everybody: I have configured the env about LZO like this: 9da01...@a75e774d.bbf50755.jpg 54346...@a75e774d.bbf50755.jpg But when I execute code with spark-shell ,still error come out like this: scala val hdfsfile=sc.textFile(/xiaoming/gps_info) scala hdfsfile.map(_.split(,)) scala res0.collect ava.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:184) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:197) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.collect(RDD.scala:797) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
RE: Column Similarity using DIMSUM
Hi Reza, Behavior: · I tried running the job with different thresholds - 0.1, 0.5, 5, 20 100. Every time, the job got stuck at mapPartitionsWithIndex at RowMatrix.scala:522http://del2l379java.sapient.com:8088/proxy/application_1426267549766_0101/stages/stage?id=118attempt=0 with all workers running on 100% CPU. There is hardly any shuffle read/write happening. And after some time, “ERROR YarnClientClusterScheduler: Lost executor” start showing (maybe because of the nodes running on 100% CPU). · For threshold 200+ (tried up to 1000) it gave an error (here was different for different thresholds) Exception in thread main java.lang.IllegalArgumentException: requirement failed: Oversampling should be greater than 1: 0. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilaritiesDIMSUM(RowMatrix.scala:511) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilarities(RowMatrix.scala:492) at EntitySimilarity$.runSimilarity(EntitySimilarity.scala:241) at EntitySimilarity$.main(EntitySimilarity.scala:80) at EntitySimilarity.main(EntitySimilarity.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) · If I get rid of frequently occurring attributes and keep only those attributes which are occurring in at 2% entities, then job doesn’t stuck / fail. Data environment: · RowMatrix of size 43345 X 56431 · In the matrix there are couple of rows, whose value is same in up to 50% of the columns (frequently occurring attributes). · I am running this, on one of our Dev cluster running on CDH 5.3.0 5 data nodes (each 4-core and 16GB RAM). My question – Do you think this is a hardware size issue and we should test it on larger machines? Regards, Manish From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Wednesday, March 18, 2015 11:20 PM To: Reza Zadeh Cc: user@spark.apache.org Subject: RE: Column Similarity using DIMSUM Hi Reza, I have tried threshold to be only in the range of 0 to 1. I was not aware that threshold can be set to above 1. Will try and update. Thank You - Manish From: Reza Zadeh [mailto:r...@databricks.com] Sent: Wednesday, March 18, 2015 10:55 PM To: Manish Gupta 8 Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Column Similarity using DIMSUM Hi Manish, Did you try calling columnSimilarities(threshold) with different threshold values? You try threshold values of 0.1, 0.5, 1, and 20, and higher. Best, Reza On Wed, Mar 18, 2015 at 10:40 AM, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com wrote: Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring (all 2%), then job runs in a lightning fast manner (even for 100 Entities X 1 attributes) and results are very accurate. I am running Spark 1.2.0-cdh5.3.0 on 11 node cluster each having 4 cores and 16GB of RAM. My question is - Is this behavior expected for datasets where some Attributes frequently occur? Thanks, Manish Gupta
OutOfMemoryError during reduce tasks
Hi, I am trying to evaluate performance aspects of Spark in respect to various memory settings. What makes it more difficult is that I'm new to Python, but the problem at hand doesn't seem to originate from that. I'm running a wordcount script [1] with different amounts of input data. There is always an OutOfMemoryError at the end of the reduce tasks [2] when I'm using a 1g input while 100m of data don't make a problem. Spark is v1.2.1 (but with v1.3 I'm having the same problem) and it runs on a VM with Ubuntu 14.04, 8G RAM and 4VCPU. (If something else is of interest, please ask) On http://spark.apache.org/docs/1.2.1/tuning.html#memory-usage-of-reduce-tasks it's suggested to increase the parallelism which I've tried (even with over 4000 tasks) but nothing's changed. Other efforts with spark.executor.memory, spark.python.worker.memory and extraJavaOptions with -Xmx4g (see code below) didn't solve the problem either. What do you suggest to get rid of the Java heap filling up completely? Thanks Balazs [1] Wordcount script import sys import time from operator import add from pyspark import SparkContext, SparkConf from signal import signal, SIGPIPE, SIG_DFL def encode(text): For printing unicode characters to the console. return text.encode('utf-8') if __name__ == __main__: start_time = time.time() if len(sys.argv) != 2: print sys.stderr, Usage: wordcount file exit(-1) conf = (SparkConf() .setMaster(local) .setAppName(PythonWordCount) .set(spark.executor.memory, 6g) .set(spark.python.worker.memory,6g) .set(spark.default.parallelism,120) .set(spark.driver.extraJavaOptions,-Xmx4g)) sc = SparkContext(conf = conf) lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() # output would take too long and the important thing is the processing time #for (word, count) in output: #print encode(%s: %i % (word, count)) #print(%f seconds % (time.time() - start_time)) sc.stop() print(%f seconds % (time.time() - start_time)) [2] OutOfMemoryError at reduce tasks ... 15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Getting 30 non-empty blocks out of 30 blocks 15/03/19 07:58:52 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 15/03/19 07:58:52 INFO TaskSetManager: Finished task 99.0 in stage 1.0 (TID 129) in 1096 ms on localhost (100/120) 15/03/19 07:58:52 INFO PythonRDD: Times: total = 351, boot = -530, init = 534, finish = 347 15/03/19 07:58:52 ERROR Executor: Exception in task 100.0 in stage 1.0 (TID 130) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:164) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:48) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1137) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/19 07:58:52 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main] ... - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib Spam example gets stuck in Stage X
To get these metrics out, you need to open the driver ui running on port 4040. And in there you will see Stages information and for each stage you can see how much time it is spending on GC etc. In your case, the parallelism seems 4, the more # of parallelism the more # of tasks you will see. Thanks Best Regards On Thu, Mar 19, 2015 at 1:15 PM, Su She suhsheka...@gmail.com wrote: Hi Akhil, 1) How could I see how much time it is spending on stage 1? Or what if, like above, it doesn't get past stage 1? 2) How could I check if its a GC time? and where would I increase the parallelism for the model? I have a Spark Master and 2 Workers running on CDH 5.3...what would the default spark-shell level of parallelism be...I thought it would be 3? Thank you for the help! -Su On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you see where exactly it is spending time? Like you said it goes to Stage 2, then you will be able to see how much time it spend on Stage 1. See if its a GC time, then try increasing the level of parallelism or repartition it like sc.getDefaultParallelism*3. Thanks Best Regards On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am trying to run this MLlib example from Learning Spark: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48 Things I'm doing differently: 1) Using spark shell instead of an application 2) instead of their spam.txt and normal.txt I have text files with 3700 and 2700 words...nothing huge at all and just plain text 3) I've used numFeatures = 100, 1000 and 10,000 *Error: *I keep getting stuck when I try to run the model: val model = new LogisticRegressionWithSGD().run(trainingData) It will freeze on something like this: [Stage 1:==(1 + 0) / 4] Sometimes its Stage 1, 2 or 3. I am not sure what I am doing wrong...any help is much appreciated, thank you! -Su
Re: Null pointer exception reading Parquet
How are you running the application? Can you try running the same inside spark-shell? Thanks Best Regards On Wed, Mar 18, 2015 at 10:51 PM, sprookie cug12...@gmail.com wrote: Hi All, I am using Saprk version 1.2 running locally. When I try to read a paquet file I get below exception, what might be the issue? Any help will be appreciated. This is the simplest operation/action on a parquet file. //code snippet// val sparkConf = new SparkConf().setAppName( Testing).setMaster(local[10]) val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.parquet.binaryAsString,true) import sqlContext._ val temp = local path to file val temp2 = sqlContext.parquetFile(temp) temp2.printSchema //end code snippet //Exception trace Exception in thread main java.lang.NullPointerException at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543) at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:389) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:457) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:457) at org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:477) at org.apache.spark.sql.parquet.ParquetRelation.init(ParquetRelation.scala:65) at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:165) //End Exception trace -- View this message in context: Null pointer exception reading Parquet http://apache-spark-user-list.1001560.n3.nabble.com/Null-pointer-exception-reading-Parquet-tp22124.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
R: iPython Notebook + Spark + Accumulo -- best practice?
Yes, I would suggest spark-notebook too. It's very simple to setup and it's growing pretty fast. Paolo Inviata dal mio Windows Phone Da: Irfan Ahmadmailto:ir...@cloudphysics.com Inviato: 19/03/2015 04:05 A: davidhmailto:dav...@annaisystems.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Re: iPython Notebook + Spark + Accumulo -- best practice? I forgot to mention that there is also Zeppelin and jove-notebook but I haven't got any experience with those yet. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 8:01 PM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Hi David, W00t indeed and great questions. On the notebook front, there are two options depending on what you are looking for. You can either go with iPython 3 with Spark-kernel as a backend or you can use spark-notebook. Both have interesting tradeoffs. If you have looking for a single notebook platform for your data scientists that has R, Python as well as a Spark Shell, you'll likely want to go with iPython + Spark-kernel. Downsides with the spark-kernel project are that data visualization isn't quite there yet, early days for documentation and blogs/etc. Upside is that R and Python work beautifully and that the ipython committers are super-helpful. If you are OK with a primarily spark/scala experience, then I suggest you with spark-notebook. Upsides are that the project is a little further along, visualization support is better than spark-kernel (though not as good as iPython with Python) and the committer is awesome with help. Downside is that you won't get R and Python. FWIW: I'm using both at the moment! Hope that helps. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 5:45 PM, davidh dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi all, I've been DDGing, Stack Overflowing, Twittering, RTFMing, and scanning through this archive with only moderate success. in other words -- my way of saying sorry if this is answered somewhere obvious and I missed it :-) i've been tasked with figuring out how to connect Notebook, Spark, and Accumulo together. The end user will do her work via notebook. thus far, I've successfully setup a Vagrant image containing Spark, Accumulo, and Hadoop. I was able to use some of the Accumulo example code to create a table populated with data, create a simple program in scala that, when fired off to Spark via spark-submit, connects to accumulo and prints the first ten rows of data in the table. so w00t on that - but now I'm left with more questions: 1) I'm still stuck on what's considered 'best practice' in terms of hooking all this together. Let's say Sally, a user, wants to do some analytic work on her data. She pecks the appropriate commands into notebook and fires them off. how does this get wired together on the back end? Do I, from notebook, use spark-submit to send a job to spark and let spark worry about hooking into accumulo or is it preferable to create some kind of open stream between the two? 2) if I want to extend spark's api, do I need to first submit an endless job via spark-submit that does something like what this gentleman describes http://blog.madhukaraphatak.com/extending-spark-api ? is there an alternative (other than refactoring spark's source) that doesn't involve extending the api via a job submission? ultimately what I'm looking for help locating docs, blogs, etc that may shed some light on this. t/y in advance! d -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iPython-Notebook-Spark-Accumulo-best-practice-tp22137.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Saprk 1.2.0 | Spark job fails with MetadataFetchFailedException
I have a job that sorts data and runs a combineByKey operation and it sometimes fails with the following error. The job is running on spark 1.2.0 cluster with yarn-client deployment mode. Any clues on how to debug the error? org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Reading a text file into RDD[Char] instead of RDD[String]
Hi, I’m struggling to think of the best way to read a text file into an RDD[Char] rather than [String] I can do: sc.textFile(….) which gives me the Rdd[String], Can anyone suggest the most efficient way to create the RDD[Char] ? I’m sure I’ve missed something simple… Regards, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
calculating TF-IDF for large 100GB dataset problems
Hi, I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13 files - 100GB totally) located in HDFS . This process already token about 20 hours on 3 node cluster with 6 cores, 20GB RAM on each node. In my opinion it's to long :-) I started the task with the following command: spark-submit --master yarn --num-executors 9 --executor-memory 5GB --excutor-cores=2 --driver-memory 5GB weight.py weight.py: from pyspark import SparkConf, SparkContext from pyspark.mllib.feature import HashingTF from pyspark.mllib.feature import IDF from pyspark.mllib.feature import Normalizer conf = SparkConf() \ .set(spark.hadoop.validateOutputSpecs, false) \ .set(spark.yarn.executor.memoryOverhead, 900) sc = SparkContext(conf=conf) # reading files from directory 'in/texts.txt' in HDFS texts=sc.textFile('in/texts.txt') \ .map(lambda line: line.split()) hashingTF = HashingTF() tf = hashingTF.transform(texts) tf.cache() idf = IDF(minDocFreq=100).fit(tf) tfidf = idf.transform(tf) n=Normalizer() normalized=n.transform(tfidf) def x2((vec, num)): triples=[] for id, weight in zip(vec.indices, vec.values): triples.append((num, id, weight)) return triples # I use zipWithIndex to enumerate documents normalized.zipWithIndex() \ .flatMap(x2) \ .map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \ .saveAsTextFile('out/weights.txt') 1) What could be a bottleneck? Unfortunately I don't have access to the web UI. In the log file I see stages: 0,1,2,3 Stage 0 MapPartitionsRDD[6] at mapPartitionsWithIndex at RDDFunctions.scala:108 with 584 tasks completed very quick Stage 1 MappedRDD[8] at values at RDDFunctions.scala:110 (23 tasks) - quick too Stage 2 zipWithIndex (584 tasks) was long (17 hours) Stage 3 saveAsTextFile (584 tasks) - too (still executing about 2 hours) I don't understand bounds of Stages 0,1.. And don't understand why I I see numbers like 584 or 23 tasks on stages. 2) On previous start of this task I saw a lot of executor lost errors of yarn scheduler. Later I added .set(spark.yarn.executor.memoryOverhead, 900) setting in code and now I see only a few such messages. Could it be a reason of poor performance? Please advise! Any explainations appreciated! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does newly-released LDA (Latent Dirichlet Allocation) algorithm supports ngrams?
Heszak, I have only glanced at it but you should be able to incorporate tokens approximating n-gram yourself, say by using the lucene ShingleAnalyzerWrapper API http://lucene.apache.org/core/4_9_0/analyzers-common/org/apache/lucene/analysis/shingle/ShingleAnalyzerWrapper.html You might also take a glance at http://www.mimno.org/articles/phrases/ C On Wed, Mar 18, 2015 at 5:37 PM, heszak hzakerza...@collabware.com wrote: I wonder to know whether the newly-released LDA (Latent Dirichlet Allocation) algorithm only supports uni-gram or it can also supports bi/tri-grams too? If it can, can someone help me how I can use them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-newly-released-LDA-Latent-Dirichlet-Allocation-algorithm-supports-ngrams-tp22131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- - Charles
RE: Spark SQL Self join with agreegate
Not so sure your intention, but something like SELECT sum(val1), sum(val2) FROM table GROUP BY src, dest ? -Original Message- From: Shailesh Birari [mailto:sbirar...@gmail.com] Sent: Friday, March 20, 2015 9:31 AM To: user@spark.apache.org Subject: Spark SQL Self join with agreegate Hello, I want to use Spark sql to aggregate some columns of the data. e.g. I have huge data with some columns as: time, src, dst, val1, val2 I want to calculate sum(val1) and sum(val2) for all unique pairs of src and dst. I tried by forming SQL query SELECT a.time, a.src, a.dst, sum(a.val1), sum(a.val2) from table a, table b where a.src = b.src and a.dst = b.dst I know I am doing something wrong here. Can you please let me know is it doable and how ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Self-join-with-agreegate-tp22151.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LZO configuration can not affect
jeanlyn92: I was not very clear in previous reply: I meant to refer to /home/hadoop/mylib/hadoop-lzo-SNAPSHOT.jar But looks like the distro includes hadoop-lzo-0.4.15.jar Cheers On Thu, Mar 19, 2015 at 6:26 PM, jeanlyn92 jeanly...@gmail.com wrote: That's not enough .The config must appoint specific jar instead of the folder. 2015-03-19 21:27 GMT+08:00 Ted Yu yuzhih...@gmail.com: If I read the screenshot correctly, Hadoop lzo jar is under /home/hadoop/mylib Cheers On Mar 19, 2015, at 5:37 AM, jeanlyn92 jeanly...@gmail.com wrote: You should conf as follow: export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native:$HADOOP_HOME/share/hadoop/common/lib/ *hadoop-lzo-0.4.15.jar* On 03/19/2015 05:25 PM, Ted Yu wrote: How did you generate the Hadoop-lzo jar ? Thanks On Mar 17, 2015, at 2:36 AM, 唯我者 878223...@qq.com wrote: hi,everybody: I have configured the env about LZO like this: 9da01...@a75e774d.bbf50755.jpg 54346...@a75e774d.bbf50755.jpg But when I execute code with spark-shell ,still error come out like this: scala val hdfsfile=sc.textFile(/xiaoming/gps_info) scala hdfsfile.map(_.split(,)) scala res0.collect ava.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:184) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:197) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.collect(RDD.scala:797) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at
Spark SQL Self join with agreegate
Hello, I want to use Spark sql to aggregate some columns of the data. e.g. I have huge data with some columns as: time, src, dst, val1, val2 I want to calculate sum(val1) and sum(val2) for all unique pairs of src and dst. I tried by forming SQL query SELECT a.time, a.src, a.dst, sum(a.val1), sum(a.val2) from table a, table b where a.src = b.src and a.dst = b.dst I know I am doing something wrong here. Can you please let me know is it doable and how ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Self-join-with-agreegate-tp22151.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Column Similarity using DIMSUM
Thanks Reza. It makes perfect sense. Regards, Manish From: Reza Zadeh [mailto:r...@databricks.com] Sent: Thursday, March 19, 2015 11:58 PM To: Manish Gupta 8 Cc: user@spark.apache.org Subject: Re: Column Similarity using DIMSUM Hi Manish, With 56431 columns, the output can be as large as 56431 x 56431 ~= 3bn. When a single row is dense, that can end up overwhelming a machine. You can push that up with more RAM, but note that DIMSUM is meant for tall and skinny matrices: so it scales linearly and across cluster with rows, but still quadratically with the number of columns. I will be updating the documentation to make this clear. Best, Reza On Thu, Mar 19, 2015 at 3:46 AM, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com wrote: Hi Reza, Behavior: • I tried running the job with different thresholds - 0.1, 0.5, 5, 20 100. Every time, the job got stuck at mapPartitionsWithIndex at RowMatrix.scala:522http://del2l379java.sapient.com:8088/proxy/application_1426267549766_0101/stages/stage?id=118attempt=0 with all workers running on 100% CPU. There is hardly any shuffle read/write happening. And after some time, “ERROR YarnClientClusterScheduler: Lost executor” start showing (maybe because of the nodes running on 100% CPU). • For threshold 200+ (tried up to 1000) it gave an error (here was different for different thresholds) Exception in thread main java.lang.IllegalArgumentException: requirement failed: Oversampling should be greater than 1: 0. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilaritiesDIMSUM(RowMatrix.scala:511) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilarities(RowMatrix.scala:492) at EntitySimilarity$.runSimilarity(EntitySimilarity.scala:241) at EntitySimilarity$.main(EntitySimilarity.scala:80) at EntitySimilarity.main(EntitySimilarity.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) • If I get rid of frequently occurring attributes and keep only those attributes which are occurring in at 2% entities, then job doesn’t stuck / fail. Data environment: • RowMatrix of size 43345 X 56431 • In the matrix there are couple of rows, whose value is same in up to 50% of the columns (frequently occurring attributes). • I am running this, on one of our Dev cluster running on CDH 5.3.0 5 data nodes (each 4-core and 16GB RAM). My question – Do you think this is a hardware size issue and we should test it on larger machines? Regards, Manish From: Manish Gupta 8 [mailto:mgupt...@sapient.commailto:mgupt...@sapient.com] Sent: Wednesday, March 18, 2015 11:20 PM To: Reza Zadeh Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Column Similarity using DIMSUM Hi Reza, I have tried threshold to be only in the range of 0 to 1. I was not aware that threshold can be set to above 1. Will try and update. Thank You - Manish From: Reza Zadeh [mailto:r...@databricks.com] Sent: Wednesday, March 18, 2015 10:55 PM To: Manish Gupta 8 Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Column Similarity using DIMSUM Hi Manish, Did you try calling columnSimilarities(threshold) with different threshold values? You try threshold values of 0.1, 0.5, 1, and 20, and higher. Best, Reza On Wed, Mar 18, 2015 at 10:40 AM, Manish Gupta 8 mgupt...@sapient.commailto:mgupt...@sapient.com wrote: Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring
Re: Software stack for Recommendation engine with spark mlib
Hi , Just 2 follow up questions, please suggest 1. Is there any commercial recommendation engine apart from the open source tools(Mahout,Spark) that are available that anybody can suggest ? 2. In this case only the purchase transaction is captured. There are no ratings and no feedback available, no page views calculated by the application, so in this case how far the recommendation engine will be effective in recommending similar products to a user. What are the features that should be available in order to create a robust recommendation engine. e.g product views etc, please kindly suggest a few features that should be available. Is purchase enough? Thanks in advance. On Sun, Mar 15, 2015 at 6:16 PM, Sean Owen so...@cloudera.com wrote: I think you're assuming that you will pre-compute recommendations and store them in Mongo. That's one way to go, with certain tradeoffs. You can precompute offline easily, and serve results at large scale easily, but, you are forced to precompute everything -- lots of wasted effort, not completely up to date. The front-end part of the stack looks right. Spark would do the model building; you'd have to write a process to score recommendations and store the result. Mahout is the same thing, really. 500K items isn't all that large. Your requirements aren't driven just by items though. Number of users and latent features matter too. It matters how often you want to build the model too. I'm guessing you would get away with a handful of modern machines for a problem this size. In a way what you describe reminds me of Wibidata, since it built recommender-like solutions on top of data and results published to a NoSQL store. You might glance at the related OSS project Kiji (http://kiji.org/) for ideas about how to manage the schema. You should have a look at things like Nick's architecture for Graphflow, however it's more concerned with computing recommendation on the fly, and describes a shift from an architecture originally built around something like a NoSQL store: http://spark-summit.org/wp-content/uploads/2014/07/Using-Spark-and-Shark-to-Power-a-Realt-time-Recommendation-and-Customer-Intelligence-Platform-Nick-Pentreath.pdf This is also the kind of ground the oryx project is intended to cover, something I've worked on personally: https://github.com/OryxProject/oryx -- a layer on and around the core model building in Spark + Spark Streaming to provide a whole recommender (for example), down to the REST API. On Sun, Mar 15, 2015 at 10:45 AM, Shashidhar Rao raoshashidhar...@gmail.com wrote: Hi, Can anyone who has developed recommendation engine suggest what could be the possible software stack for such an application. I am basically new to recommendation engine , I just found out Mahout and Spark Mlib which are available . I am thinking the below software stack. 1. The user is going to use Android app. 2. Rest Api sent to app server from the android app to get recommendations. 3. Spark Mlib core engine for recommendation engine 4. MongoDB database backend. I would like to know more on the cluster configuration( how many nodes etc) part of spark for calculating the recommendations for 500,000 items. This items include products for day care etc. Other software stack suggestions would also be very useful.It has to run on multiple vendor machines. Please suggest. Thanks shashi
Re: KMeans with large clusters Java Heap Space
Thanks Derrick, when I count the unique terms it is very small. So I added this... val tfidf_features = lines.flatMap(x = x._2.split( ).filter(_.length 2)).distinct().count().toInt val hashingTF = new HashingTF(tfidf_features) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-with-large-clusters-Java-Heap-Space-tp21432p22153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark MLLib KMeans Top Terms
I'm trying to cluster short text messages using KMeans, after trained the kmeans I want to get the top terms (5 - 10). How do I get that using clusterCenters? full code is here http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-with-large-clusters-Java-Heap-Space-td21432.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLLib-KMeans-Top-Terms-tp22154.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can LBFGS be used on streaming data?
Regarding the first question, can you say more about how you are loading your data? And what is the size of the data set? And is that the only error you see, and do you only see it in the streaming version? For the second question, there are a couple reasons the weights might slightly differ, it depends on exactly how you set up the comparison. When you split it into 5, were those the same 5 chunks of data you used for the streaming case? And were they presented to the optimizer in the same order? Difference in either could produce small differences in the resulting weights, but that doesn’t mean it’s doing anything wrong. - jeremyfreeman.net @thefreemanlab On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello Jeremy, Thank you for your reply. When I am running this code on the local machine on a streaming data, it keeps giving me this error: WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Doug, I did try setting that config parameter to a larger number (several minutes), but still wasn't able to retrieve additional context logs. Let us know if you have any success with it. Thanks, Bharath On Fri, Mar 20, 2015 at 3:21 AM, Doug Balog doug.sparku...@dugos.com wrote: I’m seeing the same problem. I’ve set logging to DEBUG, and I think some hints are in the “Yarn AM launch context” that is printed out before Yarn runs java. My next step is to talk to the admins and get them to set yarn.nodemanager.delete.debug-delay-sec in the config, as recommended in http://spark.apache.org/docs/latest/running-on-yarn.html Then I can see exactly whats in the directory. Doug ps Sorry for the dup message Bharath and Todd, used wrong email address. On Mar 19, 2015, at 1:19 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for clarifying Todd. This may then be an issue specific to the HDP version we're using. Will continue to debug and post back if there's any resolution. On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote: Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution.
Launching Spark Cluster Application through IDE
I am trying to debug a Spark Application on a cluster using a master and several worker nodes. I have been successful at setting up the master node and worker nodes using Spark standalone cluster manager. I downloaded the spark folder with binaries and use the following commands to setup worker and master nodes. These commands are executed from the spark directory. command for launching master ./sbin/start-master.sh command for launching worker node ./bin/spark-class org.apache.spark.deploy.worker.Worker master-URL command for submitting application ./sbin/spark-submit --class Application --master URL ~/app.jar Now, I would like to understand the flow of control through the Spark source code on the worker nodes when I submit my application(I just want to use one of the given examples that use reduce()). I am assuming I should setup Spark on Eclipse. The Eclipse setup link on the Apache Spark website seems to be broken. I would appreciate some guidance on setting up Spark and Eclipse to enable stepping through Spark source code on the worker nodes. If not Eclipse, I would be open to using some other IDE or approach that will enable me to step through Spark source code after launching my application. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Launching-Spark-Cluster-Application-through-IDE-tp22155.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: iPython Notebook + Spark + Accumulo -- best practice?
kk - I'll put something together and get back to you with more :-) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com [cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com] www.AnnaiSystems.comhttp://www.AnnaiSystems.com On Mar 19, 2015, at 10:59 AM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Once you setup spark-notebook, it'll handle the submits for interactive work. Non-interactive is not handled by it. For that spark-kernel could be used. Give it a shot ... it only takes 5 minutes to get it running in local-mode. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Thu, Mar 19, 2015 at 9:51 AM, David Holiday dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi all - thx for the alacritous replies! so regarding how to get things from notebook to spark and back, am I correct that spark-submit is the way to go? DAVID HOLIDAY Software Engineer 760 607 3300tel:760%20607%203300 | Office 312 758 8385tel:312%20758%208385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.comhttp://www.annaisystems.com/ On Mar 19, 2015, at 1:14 AM, Paolo Platter paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it wrote: Yes, I would suggest spark-notebook too. It's very simple to setup and it's growing pretty fast. Paolo Inviata dal mio Windows Phone Da: Irfan Ahmadmailto:ir...@cloudphysics.com Inviato: 19/03/2015 04:05 A: davidhmailto:dav...@annaisystems.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Re: iPython Notebook + Spark + Accumulo -- best practice? I forgot to mention that there is also Zeppelin and jove-notebook but I haven't got any experience with those yet. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 8:01 PM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Hi David, W00t indeed and great questions. On the notebook front, there are two options depending on what you are looking for. You can either go with iPython 3 with Spark-kernel as a backend or you can use spark-notebook. Both have interesting tradeoffs. If you have looking for a single notebook platform for your data scientists that has R, Python as well as a Spark Shell, you'll likely want to go with iPython + Spark-kernel. Downsides with the spark-kernel project are that data visualization isn't quite there yet, early days for documentation and blogs/etc. Upside is that R and Python work beautifully and that the ipython committers are super-helpful. If you are OK with a primarily spark/scala experience, then I suggest you with spark-notebook. Upsides are that the project is a little further along, visualization support is better than spark-kernel (though not as good as iPython with Python) and the committer is awesome with help. Downside is that you won't get R and Python. FWIW: I'm using both at the moment! Hope that helps. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 5:45 PM, davidh dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi all, I've been DDGing, Stack Overflowing, Twittering, RTFMing, and scanning through this archive with only moderate success. in other words -- my way of saying sorry if this is answered somewhere obvious and I missed it :-) i've been tasked with figuring out how to connect Notebook, Spark, and Accumulo together. The end user will do her work via notebook. thus far, I've successfully setup a Vagrant image containing Spark, Accumulo, and Hadoop. I was able to use some of the Accumulo example code to create a table populated with data, create a simple program in scala that, when fired off to Spark via spark-submit, connects to accumulo and prints the first ten rows of data in the table. so w00t on that - but now I'm left with more questions: 1) I'm still stuck on what's considered 'best practice' in terms of hooking all this together. Let's say Sally, a user, wants to do some analytic work on her data. She pecks the appropriate commands into notebook and fires them off. how does this get wired together on the back end? Do I, from notebook, use spark-submit to send a job to spark and let spark worry about hooking into accumulo or is it preferable to create some kind of open stream between the two? 2) if I want to extend spark's api, do I need to first submit an endless job via spark-submit that does something like what this gentleman describes
Re: Issues with SBT and Spark
No, Spark is cross-built for 2.11 too, and those are the deps being pulled in here. This really does however sounds like a Scala 2.10 vs 2.11 mismatch. Check that, for example, your cluster is using the same build of Spark and that you did not package Spark with your app On Thu, Mar 19, 2015 at 3:36 PM, Masf masfwo...@gmail.com wrote: Hi Spark 1.2.1 uses Scala 2.10. Because of this, your program fails with scala 2.11 Regards On Thu, Mar 19, 2015 at 8:17 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: My current simple.sbt is name := SparkEpiFast version := 1.0 scalaVersion := 2.11.4 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.2.1 % provided libraryDependencies += org.apache.spark % spark-graphx_2.11 % 1.2.1 % provided While I do sbt package, it compiles successfully. But while running the application, I get Exception in thread main java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; However, changing the scala version to 2.10.4 and updating the dependency lines appropriately resolves the issue (no exception). Could anyone please point out what I am doing wrong? -- Saludos. Miguel Ángel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with SBT and Spark
Hi Spark 1.2.1 uses Scala 2.10. Because of this, your program fails with scala 2.11 Regards On Thu, Mar 19, 2015 at 8:17 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: My current simple.sbt is name := SparkEpiFast version := 1.0 scalaVersion := 2.11.4 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.2.1 % provided libraryDependencies += org.apache.spark % spark-graphx_2.11 % 1.2.1 % provided While I do sbt package, it compiles successfully. But while running the application, I get Exception in thread main java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; However, changing the scala version to 2.10.4 and updating the dependency lines appropriately resolves the issue (no exception). Could anyone please point out what I am doing wrong? -- Saludos. Miguel Ángel
Re: Spark SQL filter DataFrame by date?
Can you add your code snippet? Seems it's missing in the original email. Thanks, Yin On Thu, Mar 19, 2015 at 3:22 PM, kamatsuoka ken...@gmail.com wrote: I'm trying to filter a DataFrame by a date column, with no luck so far. Here's what I'm doing: When I run reqs_day.count() I get zero, apparently because my date parameter gets translated to 16509. Is this a bug, or am I doing it wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-DataFrame-by-date-tp22149.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading a text file into RDD[Char] instead of RDD[String]
val s = sc.parallelize(Array(foo, bar, baz)) val c = s.flatMap(_.toIterator) c.collect() res8: Array[Char] = Array(f, o, o, b, a, r, b, a, z) On Thu, Mar 19, 2015 at 8:46 AM, Michael Lewis lewi...@me.com wrote: Hi, I’m struggling to think of the best way to read a text file into an RDD[Char] rather than [String] I can do: sc.textFile(….) which gives me the Rdd[String], Can anyone suggest the most efficient way to create the RDD[Char] ? I’m sure I’ve missed something simple… Regards, Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LZO configuration can not affect
If I read the screenshot correctly, Hadoop lzo jar is under /home/hadoop/mylib Cheers On Mar 19, 2015, at 5:37 AM, jeanlyn92 jeanly...@gmail.com wrote: You should conf as follow: export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native:$HADOOP_HOME/share/hadoop/common/lib/hadoop-lzo-0.4.15.jar On 03/19/2015 05:25 PM, Ted Yu wrote: How did you generate the Hadoop-lzo jar ? Thanks On Mar 17, 2015, at 2:36 AM, 唯我者 878223...@qq.com wrote: hi,everybody: I have configured the env about LZO like this: 9da01...@a75e774d.bbf50755.jpg 54346...@a75e774d.bbf50755.jpg But when I execute code with spark-shell ,still error come out like this: scala val hdfsfile=sc.textFile(/xiaoming/gps_info) scala hdfsfile.map(_.split(,)) scala res0.collect ava.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:184) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:197) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.collect(RDD.scala:797) at $iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC.init(console:24) at $iwC.init(console:26) at init(console:28) at .init(console:32) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
RE: Why I didn't see the benefits of using KryoSerializer
I read the Spark code a little bit, trying to understand my own question. It looks like the different is really between org.apache.spark.serializer.JavaSerializer and org.apache.spark.serializer.KryoSerializer, both having the method named writeObject. In my test case, for each line of my text file, it is about 140 bytes of String. When either JavaSerializer.writeObject(140 bytes of String) or KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the underline OutputStream space usage. Does this mean that KryoSerializer really doesn't give us any benefit for String type? I understand that for primitives types, it shouldn't have any benefits, but how about String type? When we talk about lower the memory using KryoSerializer in spark, under what case it can bring significant benefits? It is my first experience with the KryoSerializer, so maybe I am total wrong about its usage. Thanks Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: Why I didn't see the benefits of using KryoSerializer Date: Tue, 17 Mar 2015 12:01:35 -0400 Hi, I am new to Spark. I tried to understand the memory benefits of using KryoSerializer. I have this one box standalone test environment, which is 24 cores with 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0. I put one text file in the hdfs about 1.2G. Here is the settings in the spark-env.sh export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=4g First test case:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)log.count()log.count() The data is about 3M rows. For the first test case, from the storage in the web UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 cached partitions.This matched with what I thought, and first count finished about 17s, and 2nd count finished about 6s. 2nd test case after restart the spark-shell:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached is 100% with 10 cached partitions. It looks like caching the default java serialized format reduce the memory usage, but coming with a cost that first count finished around 39s and 2nd count finished around 9s. So the job runs slower, with less memory usage. So far I can understand all what happened and the tradeoff. Now the problem comes with when I tried to test with KryoSerializer SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer /opt/spark/bin/spark-shellval log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() First, I saw that the new serializer setting passed in, as proven in the Spark Properties of Environment shows spark.driver.extraJavaOptions -Dspark.serializer=org.apache.spark.serializer.KryoSerializer . This is not there for first 2 test cases.But in the web UI of Storage, the Size in Memory is 1234M, with 100% Fraction Cached and 10 cached partitions. The first count took 46s and 2nd count took 23s. I don't get much less memory size as I expected, but longer run time for both counts. Anything I did wrong? Why the memory foot print of MEMORY_ONLY_SER for KryoSerializer still use the same size as default Java serializer, with worse duration? Thanks Yong
Re: DataFrame operation on parquet: GC overhead limit exceeded
Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Problems with spark.akka.frameSize
Hi, I am encountering the following error with a Spark application. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 11257268 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values. I am doing the following in my code. var groupedEdges = graph.edges.groupBy[VertexId](groupBySrcId).persist(StorageLevel.MEMORY_AND_DISK) while(condition) { var updates = groupedEdges.flatMap { edgesBySrc = doWork(edgesBySrc, a, b) } updates.collect.foreach(println) } def doWork(edges: (VertexId, Iterable[Edge[(Int, Int, Int)]]), a: Double, b: Double): List[VertexId] = { // do something with edges and return a list of verteices } Note that the attribute of each edge is a tuple with 3 elements. I encountered the above mentioned error only when having the edge attribute as a tuple. The code doesn't run into this when the edge attribute is a single integer. I tried increasing *spark.akka.frameSize* (to say 100) and it worked without running into this issue. Doing a broadcast does not seem appropriate because each task performing doWork() gets a different set of edges. However, the groups of edges remain the same all through. I was wondering if there is an efficient way to what I'm doing, i.e., pass edgesBySrc efficiently to doWork() (or not pass it all or pass it just once for the first time and have the tasks retain the sets of edges across iterations) ? Thanks
Load balancing
I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Re: iPython Notebook + Spark + Accumulo -- best practice?
hi all - thx for the alacritous replies! so regarding how to get things from notebook to spark and back, am I correct that spark-submit is the way to go? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.commailto:broo...@annaisystems.com [cid:AE39C43E-3FF7-4C90-BCE4-9711C84C4CB8@cld.annailabs.com] www.AnnaiSystems.comhttp://www.AnnaiSystems.com On Mar 19, 2015, at 1:14 AM, Paolo Platter paolo.plat...@agilelab.itmailto:paolo.plat...@agilelab.it wrote: Yes, I would suggest spark-notebook too. It's very simple to setup and it's growing pretty fast. Paolo Inviata dal mio Windows Phone Da: Irfan Ahmadmailto:ir...@cloudphysics.com Inviato: 19/03/2015 04:05 A: davidhmailto:dav...@annaisystems.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Re: iPython Notebook + Spark + Accumulo -- best practice? I forgot to mention that there is also Zeppelin and jove-notebook but I haven't got any experience with those yet. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 8:01 PM, Irfan Ahmad ir...@cloudphysics.commailto:ir...@cloudphysics.com wrote: Hi David, W00t indeed and great questions. On the notebook front, there are two options depending on what you are looking for. You can either go with iPython 3 with Spark-kernel as a backend or you can use spark-notebook. Both have interesting tradeoffs. If you have looking for a single notebook platform for your data scientists that has R, Python as well as a Spark Shell, you'll likely want to go with iPython + Spark-kernel. Downsides with the spark-kernel project are that data visualization isn't quite there yet, early days for documentation and blogs/etc. Upside is that R and Python work beautifully and that the ipython committers are super-helpful. If you are OK with a primarily spark/scala experience, then I suggest you with spark-notebook. Upsides are that the project is a little further along, visualization support is better than spark-kernel (though not as good as iPython with Python) and the committer is awesome with help. Downside is that you won't get R and Python. FWIW: I'm using both at the moment! Hope that helps. Irfan Ahmad CTO | Co-Founder | CloudPhysicshttp://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Wed, Mar 18, 2015 at 5:45 PM, davidh dav...@annaisystems.commailto:dav...@annaisystems.com wrote: hi all, I've been DDGing, Stack Overflowing, Twittering, RTFMing, and scanning through this archive with only moderate success. in other words -- my way of saying sorry if this is answered somewhere obvious and I missed it :-) i've been tasked with figuring out how to connect Notebook, Spark, and Accumulo together. The end user will do her work via notebook. thus far, I've successfully setup a Vagrant image containing Spark, Accumulo, and Hadoop. I was able to use some of the Accumulo example code to create a table populated with data, create a simple program in scala that, when fired off to Spark via spark-submit, connects to accumulo and prints the first ten rows of data in the table. so w00t on that - but now I'm left with more questions: 1) I'm still stuck on what's considered 'best practice' in terms of hooking all this together. Let's say Sally, a user, wants to do some analytic work on her data. She pecks the appropriate commands into notebook and fires them off. how does this get wired together on the back end? Do I, from notebook, use spark-submit to send a job to spark and let spark worry about hooking into accumulo or is it preferable to create some kind of open stream between the two? 2) if I want to extend spark's api, do I need to first submit an endless job via spark-submit that does something like what this gentleman describes http://blog.madhukaraphatak.com/extending-spark-api ? is there an alternative (other than refactoring spark's source) that doesn't involve extending the api via a job submission? ultimately what I'm looking for help locating docs, blogs, etc that may shed some light on this. t/y in advance! d -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iPython-Notebook-Spark-Accumulo-best-practice-tp22137.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Column Similarity using DIMSUM
Hi Manish, With 56431 columns, the output can be as large as 56431 x 56431 ~= 3bn. When a single row is dense, that can end up overwhelming a machine. You can push that up with more RAM, but note that DIMSUM is meant for tall and skinny matrices: so it scales linearly and across cluster with rows, but still quadratically with the number of columns. I will be updating the documentation to make this clear. Best, Reza On Thu, Mar 19, 2015 at 3:46 AM, Manish Gupta 8 mgupt...@sapient.com wrote: Hi Reza, *Behavior*: · I tried running the job with different thresholds - 0.1, 0.5, 5, 20 100. Every time, the job got stuck at mapPartitionsWithIndex at RowMatrix.scala:522 http://del2l379java.sapient.com:8088/proxy/application_1426267549766_0101/stages/stage?id=118attempt=0 with all workers running on 100% CPU. There is hardly any shuffle read/write happening. And after some time, “ERROR YarnClientClusterScheduler: Lost executor” start showing (maybe because of the nodes running on 100% CPU). · For threshold 200+ (tried up to 1000) it gave an error (here was different for different thresholds) Exception in thread main java.lang.IllegalArgumentException: requirement failed: Oversampling should be greater than 1: 0. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilaritiesDIMSUM(RowMatrix.scala:511) at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilarities(RowMatrix.scala:492) at EntitySimilarity$.runSimilarity(EntitySimilarity.scala:241) at EntitySimilarity$.main(EntitySimilarity.scala:80) at EntitySimilarity.main(EntitySimilarity.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) · If I get rid of frequently occurring attributes and keep only those attributes which are occurring in at 2% entities, then job doesn’t stuck / fail. *Data environment*: · RowMatrix of size 43345 X 56431 · In the matrix there are couple of rows, whose value is same in up to 50% of the columns (frequently occurring attributes). · I am running this, on one of our Dev cluster running on CDH 5.3.0 5 data nodes (each 4-core and 16GB RAM). My question – Do you think this is a hardware size issue and we should test it on larger machines? Regards, Manish *From:* Manish Gupta 8 [mailto:mgupt...@sapient.com] *Sent:* Wednesday, March 18, 2015 11:20 PM *To:* Reza Zadeh *Cc:* user@spark.apache.org *Subject:* RE: Column Similarity using DIMSUM Hi Reza, I have tried threshold to be only in the range of 0 to 1. I was not aware that threshold can be set to above 1. Will try and update. Thank You - Manish *From:* Reza Zadeh [mailto:r...@databricks.com r...@databricks.com] *Sent:* Wednesday, March 18, 2015 10:55 PM *To:* Manish Gupta 8 *Cc:* user@spark.apache.org *Subject:* Re: Column Similarity using DIMSUM Hi Manish, Did you try calling columnSimilarities(threshold) with different threshold values? You try threshold values of 0.1, 0.5, 1, and 20, and higher. Best, Reza On Wed, Mar 18, 2015 at 10:40 AM, Manish Gupta 8 mgupt...@sapient.com wrote: Hi, I am running Column Similarity (All Pairs Similarity using DIMSUM) in Spark on a dataset that looks like (Entity, Attribute, Value) after transforming the same to a row-oriented dense matrix format (one line per Attribute, one column per Entity, each cell with normalized value – between 0 and 1). It runs extremely fast in computing similarities between Entities in most of the case, but if there is even a single attribute which is frequently occurring across the entities (say in 30% of entities), job falls apart. Whole job get stuck and worker nodes start running on 100% CPU without making any progress on the job stage. If the dataset is very small (in the range of 1000 Entities X 500 attributes (some frequently occurring)) the job finishes but takes too long (some time it gives GC errors too). If none of the attribute is frequently occurring (all 2%), then job runs in a lightning fast manner (even for 100 Entities X 1 attributes) and results are very accurate. I am running Spark 1.2.0-cdh5.3.0 on 11 node cluster each having 4 cores and 16GB
Re: Apache Spark User List: people's responses not showing in the browser view
I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com . - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writing Spark Streaming Programs
Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: Apache Spark User List: people's responses not showing in the browser view
Sure, you can use Nabble or search-hadoop or whatever you prefer. My point is just that the source of truth are the Apache archives, and these other sites may or may not be in sync with that truth. On Thu, Mar 19, 2015 at 10:20 AM Ted Yu yuzhih...@gmail.com wrote: I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
Yes, that is mostly why these third-party sites have sprung up around the official archives--to provide better search. Did you try the link Ted posted? On Thu, Mar 19, 2015 at 10:49 AM Dmitry Goldenberg dgoldenberg...@gmail.com wrote: It seems that those archives are not necessarily easy to find stuff in. Is there a search engine on top of them? so as to find e.g. your own posts easily? On Thu, Mar 19, 2015 at 10:34 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sure, you can use Nabble or search-hadoop or whatever you prefer. My point is just that the source of truth are the Apache archives, and these other sites may or may not be in sync with that truth. On Thu, Mar 19, 2015 at 10:20 AM Ted Yu yuzhih...@gmail.com wrote: I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
It seems that those archives are not necessarily easy to find stuff in. Is there a search engine on top of them? so as to find e.g. your own posts easily? On Thu, Mar 19, 2015 at 10:34 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sure, you can use Nabble or search-hadoop or whatever you prefer. My point is just that the source of truth are the Apache archives, and these other sites may or may not be in sync with that truth. On Thu, Mar 19, 2015 at 10:20 AM Ted Yu yuzhih...@gmail.com wrote: I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
Interesting points. Yes I just tried http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view and I see responses there now. I believe Ted was right in that, there's a delay before they show up there (probably due to some indexing latency etc.) On Thu, Mar 19, 2015 at 10:56 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, that is mostly why these third-party sites have sprung up around the official archives--to provide better search. Did you try the link Ted posted? On Thu, Mar 19, 2015 at 10:49 AM Dmitry Goldenberg dgoldenberg...@gmail.com wrote: It seems that those archives are not necessarily easy to find stuff in. Is there a search engine on top of them? so as to find e.g. your own posts easily? On Thu, Mar 19, 2015 at 10:34 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sure, you can use Nabble or search-hadoop or whatever you prefer. My point is just that the source of truth are the Apache archives, and these other sites may or may not be in sync with that truth. On Thu, Mar 19, 2015 at 10:20 AM Ted Yu yuzhih...@gmail.com wrote: I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark User List: people's responses not showing in the browser view
Here is the reason on why results on search site may be delayed, especially for Apache JIRAs. If they crawl too often, Apache would flag the bot and blacklist it. Cheers On Thu, Mar 19, 2015 at 7:59 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Interesting points. Yes I just tried http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view and I see responses there now. I believe Ted was right in that, there's a delay before they show up there (probably due to some indexing latency etc.) On Thu, Mar 19, 2015 at 10:56 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, that is mostly why these third-party sites have sprung up around the official archives--to provide better search. Did you try the link Ted posted? On Thu, Mar 19, 2015 at 10:49 AM Dmitry Goldenberg dgoldenberg...@gmail.com wrote: It seems that those archives are not necessarily easy to find stuff in. Is there a search engine on top of them? so as to find e.g. your own posts easily? On Thu, Mar 19, 2015 at 10:34 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Sure, you can use Nabble or search-hadoop or whatever you prefer. My point is just that the source of truth are the Apache archives, and these other sites may or may not be in sync with that truth. On Thu, Mar 19, 2015 at 10:20 AM Ted Yu yuzhih...@gmail.com wrote: I prefer using search-hadoop.com which provides better search capability. Cheers On Thu, Mar 19, 2015 at 6:48 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Nabble is a third-party site that tries its best to archive mail sent out over the list. Nothing guarantees it will be in sync with the real mailing list. To get the truth on what was sent over this, Apache-managed list, you unfortunately need to go the Apache archives: http://mail-archives.apache.org/mod_mbox/spark-user/ Nick On Thu, Mar 19, 2015 at 5:18 AM Ted Yu yuzhih...@gmail.com wrote: There might be some delay: http://search-hadoop.com/m/JW1q5mjZUy/Spark+people%2527s+responsessubj=Apache+Spark+User+List+people+s+responses+not+showing+in+the+browser+view On Mar 18, 2015, at 4:47 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Ted. Well, so far even there I'm only seeing my post and not, for example, your response. On Wed, Mar 18, 2015 at 7:28 PM, Ted Yu yuzhih...@gmail.com wrote: Was this one of the threads you participated ? http://search-hadoop.com/m/JW1q5w0p8x1 You should be able to find your posts on search-hadoop.com On Wed, Mar 18, 2015 at 3:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Sorry if this is a total noob question but is there a reason why I'm only seeing folks' responses to my posts in emails but not in the browser view under apache-spark-user-list.1001560.n3.nabble.com? Is this a matter of setting your preferences such that your responses only go to email and never to the browser-based view of the list? I don't seem to see such a preference... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-User-List-people-s-responses-not-showing-in-the-browser-view-tp22135.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing Spark Streaming Programs
Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: spark there is no space on the disk
Is it possible that `spark.local.dir` is overriden by others? The docs say: NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at
Re: Error when using multiple python files spark-submit
the options of spark-submit should come before main.py, or they will become the options of main.py, so it should be: ../hadoop/spark-install/bin/spark-submit --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 main.py On Mon, Mar 16, 2015 at 4:11 AM, poiuytrez guilla...@databerries.com wrote: I have a spark app which is composed of multiple files. When I launch Spark using: ../hadoop/spark-install/bin/spark-submit main.py --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 I am getting an error: 15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0 (TID 5817) on executor spark-w-3.c.databerries.internal: org.apache.spark.api.python.PythonException (Traceback (most recent call last): File /home/hadoop/spark-install/python/pyspark/worker.py, line 90, in main command = pickleSer._read_with_length(infile) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 151, in _read_with_length return self.loads(obj) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 396, in loads return cPickle.loads(obj) ImportError: No module named naive It is weird because I do not serialize anything. naive.py is also available on every machine at the same path. Any insight on what could be going on? The issue does not happen on my laptop. PS : I am using Spark 1.2.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL filter DataFrame by date?
I'm trying to filter a DataFrame by a date column, with no luck so far. Here's what I'm doing: When I run reqs_day.count() I get zero, apparently because my date parameter gets translated to 16509. Is this a bug, or am I doing it wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-DataFrame-by-date-tp22149.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3 createDataframe error with pandas df
On Mon, Mar 16, 2015 at 6:23 AM, kevindahl kevin.d...@gmail.com wrote: kevindahl wrote I'm trying to create a spark data frame from a pandas data frame, but for even the most trivial of datasets I get an error along the lines of this: --- Py4JJavaError Traceback (most recent call last) ipython-input-11-7857f9a55971 in module () 3 BabyDataSet = zip(names,births) 4 df = pd.DataFrame(data = BabyDataSet, columns=['Names', 'Births']) 5 rdf = sqlCtx.createDataFrame(df) C:\spark\python\pyspark\sql\context.pyc in createDataFrame(self, data, schema, samplingRatio) 332 333 if isinstance(schema, (list, tuple)): -- 334 first = data.first() 335 if not isinstance(first, (list, tuple)): 336 raise ValueError(each row in `rdd` should be list or tuple, C:\spark\python\pyspark\rdd.pyc in first(self) 1241 ValueError: RDD is empty 1242 - 1243 rs = self.take(1) 1244 if rs: 1245 return rs[0] C:\spark\python\pyspark\rdd.pyc in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res C:\spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 7, localhost): java.net.SocketException: Connection reset The python process had crashed, do you have any logging for this? at java.net.SocketInputStream.read(SocketInputStream.java:189) at java.net.SocketInputStream.read(SocketInputStream.java:121) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108) at org.apache.spark.api.python.PythonRDD$$anon$1. init (PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at
Spark Streaming custom receiver for local data
We are building a wrapper that makes it possible to use reactive streams (i.e. Observable, see reactivex.io) as input to Spark Streaming. We therefore tried to create a custom receiver for Spark. However, the Observable lives at the driver program and is generally not serializable. Is it possible to create a receiver that runs next to the driver program and therefore does not need to be serialized? We tried the following, which gives a NotSerializableException: object Main { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster(local[2]).setAppName(Clock) val ssc = new StreamingContext(conf, Seconds(1)) val data = // A non-serializable stream of data val stream = RxcreateStream(ssc, data) // Mapping, filtering, etc ssc.start() ssc.awaitTermination() } } with the createStream method something like the following: object RxUtils { def createStream[T: ClassTag](scc_ : StreamingContext, observable: Observable[T]): ReceiverInputDStream[T] = { new RxInputDStream[T](scc_, observable, StorageLevel.MEMORY_AND_DISK_SER_2) } } class RxInputDStream[T: ClassTag](ssc_ : StreamingContext, observable: Observable[T], storageLevel: StorageLevel) extends ReceiverInputDStream[T](ssc_) { override def getReceiver(): Receiver[T] = { new RxReceiver(observable, storageLevel) } } class RxReceiver[T](observable: Observable[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) with Logging { var subscription: Option[Subscription] = None override def onStart(): Unit = { // NOTE: 'observable' is a reference to a variable in the driver program subscription = Some( observable .asInstanceOf[Observable[T]] .subscribe(x = store(x)) ) } } the comment indicates what causes the NotSerializableException. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-custom-receiver-for-local-data-tp22148.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit and multiple files
You could submit additional Python source via --py-files , for example: $ bin/spark-submit --py-files work.py main.py On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com wrote: Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issues with SBT and Spark
My current simple.sbt is name := SparkEpiFast version := 1.0 scalaVersion := 2.11.4 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.2.1 % provided libraryDependencies += org.apache.spark % spark-graphx_2.11 % 1.2.1 % provided While I do sbt package, it compiles successfully. But while running the application, I get Exception in thread main java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; However, changing the scala version to 2.10.4 and updating the dependency lines appropriately resolves the issue (no exception). Could anyone please point out what I am doing wrong?
Re: spark there is no space on the disk
IIRC you have to set that configuration on the Worker processes (for standalone). The app can't override it (only for a client-mode driver). YARN has a similar configuration, but I don't know the name (shouldn't be hard to find, though). On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com wrote: Is it possible that `spark.local.dir` is overriden by others? The docs say: NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
Re: spark there is no space on the disk
For YARN, possibly this one ? property nameyarn.nodemanager.local-dirs/name value/hadoop/yarn/local/value /property Cheers On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com wrote: IIRC you have to set that configuration on the Worker processes (for standalone). The app can't override it (only for a client-mode driver). YARN has a similar configuration, but I don't know the name (shouldn't be hard to find, though). On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com wrote: Is it possible that `spark.local.dir` is overriden by others? The docs say: NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
I’m seeing the same problem. I’ve set logging to DEBUG, and I think some hints are in the “Yarn AM launch context” that is printed out before Yarn runs java. My next step is to talk to the admins and get them to set yarn.nodemanager.delete.debug-delay-sec in the config, as recommended in http://spark.apache.org/docs/latest/running-on-yarn.html Then I can see exactly whats in the directory. Doug ps Sorry for the dup message Bharath and Todd, used wrong email address. On Mar 19, 2015, at 1:19 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for clarifying Todd. This may then be an issue specific to the HDP version we're using. Will continue to debug and post back if there's any resolution. On Thu, Mar 19, 2015 at 3:40 AM, Todd Nist tsind...@gmail.com wrote: Yes I believe you are correct. For the build you may need to specify the specific HDP version of hadoop to use with the -Dhadoop.version=. I went with the default 2.6.0, but Horton may have a vendor specific version that needs to go here. I know I saw a similar post today where the solution was to use -Dhadoop.version=2.5.0-cdh5.3.2 but that was for a cloudera installation. I am not sure what the HDP version would be to put here. -Todd On Wed, Mar 18, 2015 at 12:49 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Spark + Kafka
Many thanks all for the good responses, appreciated. On Thu, Mar 19, 2015 at 8:36 AM, James King jakwebin...@gmail.com wrote: Thanks Khanderao. On Wed, Mar 18, 2015 at 7:18 PM, Khanderao Kand Gmail khanderao.k...@gmail.com wrote: I have used various version of spark (1.0, 1.2.1) without any issues . Though I have not significantly used kafka with 1.3.0 , a preliminary testing revealed no issues . - khanderao On Mar 18, 2015, at 2:38 AM, James King jakwebin...@gmail.com wrote: Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: saveAsTable broken in v1.3 DataFrames?
I meant table properties and serde properties are used to store metadata of a Spark SQL data source table. We do not set other fields like SerDe lib. For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table should not show unrelated stuff like Serde lib and InputFormat. I have created https://issues.apache.org/jira/browse/SPARK-6413 to track the improvement on the output of DESCRIBE statement. On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote: Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is *not* a Hive table, but a Spark SQL data source table ( http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JAVA_HOME problem with upgrade to 1.3.0
I’m trying to upgrade a Spark project, written in Scala, from Spark 1.2.1 to 1.3.0, so I changed my `build.sbt` like so: -libraryDependencies += org.apache.spark %% spark-core % 1.2.1 % “provided +libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided then make an `assembly` jar, and submit it: HADOOP_CONF_DIR=/etc/hadoop/conf \ spark-submit \ --driver-class-path=/etc/hbase/conf \ --conf spark.hadoop.validateOutputSpecs=false \ --conf spark.yarn.jar=hdfs:/apps/local/spark-assembly-1.3.0-hadoop2.4.0.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --deploy-mode=cluster \ --master=yarn \ --class=TestObject \ --num-executors=54 \ target/scala-2.11/myapp-assembly-1.2.jar The job fails to submit, with the following exception in the terminal: 15/03/19 10:30:07 INFO yarn.Client: client token: N/A diagnostics: Application application_1420225286501_4699 failed 2 times due to AM Container for appattempt_1420225286501_4699_02 exited with exitCode: 127 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:464) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Finally, I go and check the YARN app master’s web interface (since the job is shown, I know it at least made it that far), and the only logs it shows are these: Log Type: stderr Log Length: 61 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory Log Type: stdout Log Length: 0 I’m not sure how to interpret that – is '{{JAVA_HOME}}' a literal (including the brackets) that’s somehow making it into a script? Is this coming from the worker nodes or the driver? Anything I can do to experiment troubleshoot? -Ken CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the sender via reply e-mail and destroy all copies of the original message. Thank you.
saveAsTable broken in v1.3 DataFrames?
Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the assistance, I found the error it wan something I had donep; PEBCAK. I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in the project/lib directory causing it to be managed dependency and being brought in first, even though the build.sbt had the correct version specified, 2.1.0.BUILD-SNAPSHOT No reason for it to bet there at all and something I don't usually do. Thanks aging for point out the fact that it was a version mismatch issue. -Todd On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao hao.ch...@intel.com wrote: Todd, can you try run the code in Spark shell (bin/spark-shell), maybe you need to write some fake code to call the function in MappingUtils .scala, in the meantime, can you also check the jar dependencies tree of your project? Or the download dependency jar files, just in case multiple versions of spark has been introduced. *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Thursday, March 19, 2015 9:04 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary table Thanks for the quick response. The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup: radtech$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging *to* /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out* Spark *assembly* *has* been built *with* Hive, including Datanucleus jars *on* classpath Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, INT] 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); users *with* modify permissions: *Set*(tnist) 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started 15/03/18 20:31:41 INFO Remoting: Starting remoting 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses :[akka.tcp://sparkmas...@radtech.io:7077] 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: [akka.tcp://sparkmas...@radtech.io:7077] 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* port 7077. 15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:7077 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* port 8080. 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:8080 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE My build.sbt for the spark job is as follows: import AssemblyKeys._ // activating assembly plugin assemblySettings name := elasticsearch-spark *version* := 0.0.1 val SCALA_VERSION = 2.10.4 val SPARK_VERSION = 1.2.1 val defaultSettings = Defaults.coreDefaultSettings ++ Seq( organization := io.radtec, scalaVersion := SCALA_VERSION, resolvers := Seq( //ods-repo at http://artifactory.ods:8082/artifactory/repo;, Resolver.typesafeRepo(releases)), scalacOptions ++= Seq( -unchecked, -deprecation, -Xlint, -Ywarn-dead-code, -language:_, -target:jvm-1.7, -encoding, UTF-8 ), parallelExecution in Test := false, testOptions += Tests.Argument(TestFrameworks.JUnit, -v), publishArtifact in (Test, packageBin) := true, unmanagedSourceDirectories in Compile = (scalaSource in Compile)(Seq(_)), unmanagedSourceDirectories in Test = (scalaSource in Test)(Seq(_)), EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource, credentials += Credentials(Path.userHome / .ivy2 / .credentials), publishTo := Some(Artifactory Realm *at* http://artifactory.ods:8082/artifactory/ivy-repo-local;) ) // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar val hadoopDeps = Seq ( org.apache.hadoop % hadoop-client % 2.6.0 % provided ) // ElasticSearch Hadoop support val esHadoopDeps = Seq ( (org.elasticsearch % elasticsearch-hadoop % 2.1.0.BUILD-SNAPSHOT). exclude(org.apache.spark, spark-core_2.10). exclude(org.apache.spark, spark-streaming_2.10).
Re: saveAsTable broken in v1.3 DataFrames?
Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is *not* a Hive table, but a Spark SQL data source table ( http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JAVA_HOME problem with upgrade to 1.3.0
JAVA_HOME, an environment variable, should be defined on the node where appattempt_1420225286501_4699_02 ran. Cheers On Thu, Mar 19, 2015 at 8:59 AM, Williams, Ken ken.willi...@windlogics.com wrote: I’m trying to upgrade a Spark project, written in Scala, from Spark 1.2.1 to 1.3.0, so I changed my `build.sbt` like so: -libraryDependencies += org.apache.spark %% spark-core % 1.2.1 % “provided +libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided then make an `assembly` jar, and submit it: HADOOP_CONF_DIR=/etc/hadoop/conf \ spark-submit \ --driver-class-path=/etc/hbase/conf \ --conf spark.hadoop.validateOutputSpecs=false \ --conf spark.yarn.jar=hdfs:/apps/local/spark-assembly-1.3.0-hadoop2.4.0.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --deploy-mode=cluster \ --master=yarn \ --class=TestObject \ --num-executors=54 \ target/scala-2.11/myapp-assembly-1.2.jar The job fails to submit, with the following exception in the terminal: 15/03/19 10:30:07 INFO yarn.Client: client token: N/A diagnostics: Application application_1420225286501_4699 failed 2 times due to AM Container for appattempt_1420225286501_4699_02 exited with exitCode: 127 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:464) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Finally, I go and check the YARN app master’s web interface (since the job is shown, I know it at least made it that far), and the only logs it shows are these: Log Type: stderr Log Length: 61 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory Log Type: stdout Log Length: 0 I’m not sure how to interpret that – is '{{JAVA_HOME}}' a literal (including the brackets) that’s somehow making it into a script? Is this coming from the worker nodes or the driver? Anything I can do to experiment troubleshoot? -Ken -- CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the sender via reply e-mail and destroy all copies of the original message. Thank you.
Re: Writing Spark Streaming Programs
Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com wrote: Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: Writing Spark Streaming Programs
Hello James, I've been working with Spark Streaming for the last 6 months, and I'm coding in Java 7. Even though I haven't encountered any blocking issues with that combination, I'd definitely pick Scala if the decision was up to me. I agree with Gerard and Charles on this one. If you can, go with Scala for Spark Streaming applications. Cheers, Emre Sevinç http://www.bigindustries.be/ On Thu, Mar 19, 2015 at 4:09 PM, James King jakwebin...@gmail.com wrote: Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com wrote: Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk -- Emre Sevinc
Re: Writing Spark Streaming Programs
Scala is the language used to write Spark so there's never a situation in which features introduced in a newer version of Spark cannot be taken advantage of if you write your code in Scala. (This is mostly true of Java, but it may be a little more legwork if a Java-friendly adapter isn't available alongside new features.) Scala is also OO; its a functional hybrid OO language. Although much of my organization's codebase is written in Java and we've recently transitioned to Java 8 I still write all of my Spark code using Scala. (I also squeeze in Scala where I can in other parts of the organization.) Additionally I use both Python and R for local data analysis, though I haven't used Python with Spark in production. On Thu, Mar 19, 2015 at 10:51 AM James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk
Re: Writing Spark Streaming Programs
I second what has been said already. We just built a streaming app in Java and I would definitely choose Scala this time. Regards, Jeff 2015-03-19 16:34 GMT+01:00 Emre Sevinc emre.sev...@gmail.com: Hello James, I've been working with Spark Streaming for the last 6 months, and I'm coding in Java 7. Even though I haven't encountered any blocking issues with that combination, I'd definitely pick Scala if the decision was up to me. I agree with Gerard and Charles on this one. If you can, go with Scala for Spark Streaming applications. Cheers, Emre Sevinç http://www.bigindustries.be/ On Thu, Mar 19, 2015 at 4:09 PM, James King jakwebin...@gmail.com wrote: Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com wrote: Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk -- Emre Sevinc
Re: DataFrame operation on parquet: GC overhead limit exceeded
Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date).agg(sum(power),sum(supply) ).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: saveAsTable broken in v1.3 DataFrames?
Hi Yin, Thanks for the clarification. My first reaction is that if this is the intended behavior, it is a wasted opportunity. Why create a managed table in Hive that cannot be read from inside Hive? I think I understand now that you are essentially piggybacking on Hive's metastore to persist table info between/across sessions, but I imagine others might expect more (as I have.) We find ourselves wanting to do work in Spark and persist the results where other users (e.g. analysts using Tableau connected to Hive/Impala) can explore it. I imagine this is very common. I can, of course, save it as parquet and create an external table in hive (which I will do now), but saveAsTable seems much less useful to me now. Any other opinions? Cheers, C On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote: I meant table properties and serde properties are used to store metadata of a Spark SQL data source table. We do not set other fields like SerDe lib. For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table should not show unrelated stuff like Serde lib and InputFormat. I have created https://issues.apache.org/jira/browse/SPARK-6413 to track the improvement on the output of DESCRIBE statement. On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote: Hi Christian, Your table is stored correctly in Parquet format. For saveAsTable, the table created is not a Hive table, but a Spark SQL data source table (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources). We are only using Hive's metastore to store the metadata (to be specific, only table properties and serde properties). When you look at table property, there will be a field called spark.sql.sources.provider and the value will be org.apache.spark.sql.parquet.DefaultSource. You can also look at your files in the file system. They are stored by Parquet. Thanks, Yin On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com wrote: Hi all, DataFrame.saveAsTable creates a managed table in Hive (v0.13 on CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong* schema _and_ storage format in the Hive metastore, so that the table cannot be read from inside Hive. Spark itself can read the table, but Hive throws a Serialization error because it doesn't know it is Parquet. val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income) df.saveAsTable(spark_test_foo) Expected: COLUMNS( education BIGINT, income BIGINT ) SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Actual: COLUMNS( col arraystring COMMENT from deserializer ) SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat --- Manually changing schema and storage restores access in Hive and doesn't affect Spark. Note also that Hive's table property spark.sql.sources.schema is correct. At first glance, it looks like the schema data is serialized when sent to Hive but not deserialized properly on receive. I'm tracing execution through source code... but before I get any deeper, can anyone reproduce this behavior? Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JAVA_HOME problem with upgrade to 1.3.0
From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com Date: Thursday, March 19, 2015 at 11:05 AM JAVA_HOME, an environment variable, should be defined on the node where appattempt_1420225286501_4699_02 ran. Has this behavior changed in 1.3.0 since 1.2.1 though? Using 1.2.1 and making no other changes, the job completes fine. I do have JAVA_HOME set in the hadoop config files on all the nodes of the cluster: % grep JAVA_HOME /etc/hadoop/conf/*.sh /etc/hadoop/conf/hadoop-env.sh:export JAVA_HOME=/usr/jdk64/jdk1.6.0_31 /etc/hadoop/conf/yarn-env.sh:export JAVA_HOME=/usr/jdk64/jdk1.6.0_31 -Ken CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the sender via reply e-mail and destroy all copies of the original message. Thank you.