Re: Kafka Version Update 0.8.2 status?
Yes, did you see the PR for SPARK-2808? https://github.com/apache/spark/pull/3631/files It requires more than just changing the version. On Tue, Feb 10, 2015 at 3:11 PM, Ted Yu yuzhih...@gmail.com wrote: Compiling Spark master branch against Kafka 0.8.2, I got: [WARNING] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala:62: no valid targets for annotation on value ssc_ - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [WARNING] @transient ssc_ : StreamingContext, [WARNING] ^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:273: type mismatch; found : Map[kafka.common.TopicAndPartition,kafka.common.OffsetMetadataAndError] required: scala.collection.immutable.Map[kafka.common.TopicAndPartition,kafka.common.OffsetAndMetadata] [ERROR] val req = OffsetCommitRequest(groupId, metadata) [ERROR]^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:278: value requestInfo is not a member of kafka.api. OffsetCommitResponse [ERROR] val respMap = resp.requestInfo [ERROR] ^ FYI On Tue, Feb 10, 2015 at 3:23 AM, Sean Owen so...@cloudera.com wrote: I believe Kafka 0.8.2 is imminent for master / 1.4.0 -- see SPARK-2808. Anecdotally I have used Spark 1.2 with Kafka 0.8.2 without problem already. On Feb 10, 2015 10:53 AM, critikaled isasmani@gmail.com wrote: When can we expect the latest kafka and scala 2.11 support in spark streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Version-Update-0-8-2-status-tp21573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hadoopConfiguration for StreamingContext
Thanks, Akhil. I had high hopes for #2, but tried all and no luck. I was looking at the source and found something interesting. The Stack Trace (below) directs me to FileInputDStream.scala (line 141). This is version 1.1.1, btw. Line 141 has: private def fs: FileSystem = { if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) fs_ } So it looks to me like it doesn't make any attempt to use a configured HadoopConf. Here is the StackTrace: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy5.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.dstream.FileInputDStream.org $apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:141) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) ... On Tue, Feb 10, 2015 at 10:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try the following: 1. Set the access key and secret key in the sparkContext: ssc.sparkContext.hadoopConfiguration.set(AWS_ACCESS_KEY_ID,yourAccessKey) ssc.sparkContext.hadoopConfiguration.set(AWS_SECRET_ACCESS_KEY,yourSecretKey) 2. Set the access key and secret key in the environment before starting your application: export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret 3. Set the access key and secret key inside the hadoop configurations val hadoopConf=ssc.sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 4. You can also try: val stream = ssc.textFileStream(s3n://yourAccessKey:yourSecretKey@ yourBucket/path/) Thanks Best Regards On Tue, Feb 10, 2015 at 8:27 PM, Marc Limotte mslimo...@gmail.com wrote: I see that StreamingContext has a hadoopConfiguration() method, which can be used like this sample I found: sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); But StreamingContext doesn't have the same thing. I want to use a StreamingContext with s3n: text file input, but can't find a way to set the AWS credentials. I also tried (with no success): - adding the properties to conf/spark-defaults.conf - $HADOOP_HOME/conf/hdfs-site.xml - ENV variables - Embedded as user:password in s3n://user:password@... (w/ url encoding) - Setting the conf as above on a new SparkContext and passing that the StreamingContext constructor: StreamingContext(sparkContext: SparkContext, batchDuration: Duration) Can someone point me in the right direction for setting AWS creds (hadoop conf options) for streamingcontext? thanks, Marc Limotte Climate Corporation
Re: Kafka Version Update 0.8.2 status?
Compiling Spark master branch against Kafka 0.8.2, I got: [WARNING] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala:62: no valid targets for annotation on value ssc_ - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [WARNING] @transient ssc_ : StreamingContext, [WARNING] ^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:273: type mismatch; found : Map[kafka.common.TopicAndPartition,kafka.common.OffsetMetadataAndError] required: scala.collection.immutable.Map[kafka.common.TopicAndPartition,kafka.common.OffsetAndMetadata] [ERROR] val req = OffsetCommitRequest(groupId, metadata) [ERROR]^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:278: value requestInfo is not a member of kafka.api. OffsetCommitResponse [ERROR] val respMap = resp.requestInfo [ERROR] ^ FYI On Tue, Feb 10, 2015 at 3:23 AM, Sean Owen so...@cloudera.com wrote: I believe Kafka 0.8.2 is imminent for master / 1.4.0 -- see SPARK-2808. Anecdotally I have used Spark 1.2 with Kafka 0.8.2 without problem already. On Feb 10, 2015 10:53 AM, critikaled isasmani@gmail.com wrote: When can we expect the latest kafka and scala 2.11 support in spark streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Version-Update-0-8-2-status-tp21573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Version Update 0.8.2 status?
That PR hasn't been updated since the new kafka streaming stuff (including KafkaCluster) got merged to master, it will require more changes than what's in there currently. On Tue, Feb 10, 2015 at 9:25 AM, Sean Owen so...@cloudera.com wrote: Yes, did you see the PR for SPARK-2808? https://github.com/apache/spark/pull/3631/files It requires more than just changing the version. On Tue, Feb 10, 2015 at 3:11 PM, Ted Yu yuzhih...@gmail.com wrote: Compiling Spark master branch against Kafka 0.8.2, I got: [WARNING] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala:62: no valid targets for annotation on value ssc_ - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [WARNING] @transient ssc_ : StreamingContext, [WARNING] ^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:273: type mismatch; found : Map[kafka.common.TopicAndPartition,kafka.common.OffsetMetadataAndError] required: scala.collection.immutable.Map[kafka.common.TopicAndPartition,kafka.common.OffsetAndMetadata] [ERROR] val req = OffsetCommitRequest(groupId, metadata) [ERROR]^ [ERROR] /home/hbase/spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala:278: value requestInfo is not a member of kafka.api. OffsetCommitResponse [ERROR] val respMap = resp.requestInfo [ERROR] ^ FYI On Tue, Feb 10, 2015 at 3:23 AM, Sean Owen so...@cloudera.com wrote: I believe Kafka 0.8.2 is imminent for master / 1.4.0 -- see SPARK-2808. Anecdotally I have used Spark 1.2 with Kafka 0.8.2 without problem already. On Feb 10, 2015 10:53 AM, critikaled isasmani@gmail.com wrote: When can we expect the latest kafka and scala 2.11 support in spark streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Version-Update-0-8-2-status-tp21573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hadoopConfiguration for StreamingContext
Try the following: 1. Set the access key and secret key in the sparkContext: ssc.sparkContext.hadoopConfiguration.set(AWS_ACCESS_KEY_ID,yourAccessKey) ssc.sparkContext.hadoopConfiguration.set(AWS_SECRET_ACCESS_KEY,yourSecretKey) 2. Set the access key and secret key in the environment before starting your application: export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret 3. Set the access key and secret key inside the hadoop configurations val hadoopConf=ssc.sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 4. You can also try: val stream = ssc.textFileStream(s3n://yourAccessKey:yourSecretKey@ yourBucket/path/) Thanks Best Regards On Tue, Feb 10, 2015 at 8:27 PM, Marc Limotte mslimo...@gmail.com wrote: I see that StreamingContext has a hadoopConfiguration() method, which can be used like this sample I found: sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); But StreamingContext doesn't have the same thing. I want to use a StreamingContext with s3n: text file input, but can't find a way to set the AWS credentials. I also tried (with no success): - adding the properties to conf/spark-defaults.conf - $HADOOP_HOME/conf/hdfs-site.xml - ENV variables - Embedded as user:password in s3n://user:password@... (w/ url encoding) - Setting the conf as above on a new SparkContext and passing that the StreamingContext constructor: StreamingContext(sparkContext: SparkContext, batchDuration: Duration) Can someone point me in the right direction for setting AWS creds (hadoop conf options) for streamingcontext? thanks, Marc Limotte Climate Corporation
Bug in ElasticSearch and Spark SQL: Using SQL to query out data from JSON documents is totally wrong!
I'm using ElasticSearch with elasticsearch-spark-BUILD-SNAPSHOT and Spark/SparkSQL 1.2.0, from Costin Leau's advice. I want to query ElasticSearch for a bunch of JSON documents from within SparkSQL, and then use a SQL query to simply query for a column, which is actually a JSON key -- normal things that SparkSQL does using the SQLContext.jsonFile(filePath) facility. The difference I am using the ElasticSearch container. The big problem: when I do something like SELECT jsonKeyA from tempTable; I actually get the WRONG KEY out of the JSON documents! I discovered that if I have JSON keys physically in the order D, C, B, A in the json documents, the elastic search connector discovers those keys BUT then sorts them alphabetically as A,B,C,D - so when I SELECT A from tempTable, I actually get column D (because the physical JSONs had key D in the first position). This only happens when reading from elasticsearch and SparkSQL. It gets much worse: When a key is missing from one of the documents and that key should be NULL, the whole application actually crashes and gives me a java.lang.IndexOutOfBoundsException -- the schema that is inferred is totally screwed up. In the above example with physical JSONs containing keys in the order D,C,B,A, if one of the JSON documents is missing the key/column I am querying for, I get that java.lang.IndexOutOfBoundsException exception. I am using the BUILD-SNAPSHOT because otherwise I couldn't build the elasticsearch-spark project, Costin said so. Any clues here? Any fixes?
org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
Hi, I'm new to Spark. I have built small spark on yarn cluster, which contains 1 master(20GB RAM, 8 core), 3 worker(4GB RAM, 4 core). When trying to run a command sc.parallelize(1 to 1000).count() through $SPARK_HOME/bin/spark-shell, sometimes the command can submit a job successfully, sometimes it is failure with following exception. I can definitely make sure the three workers are registered to master after checking out spark webui. There are spark memory-related parameters to be configured in spark-env.sh file, for instance, SPARK_EXECUTOR_MEMORY=2G, SPARK_DRIVER_MEMORY=1G, SPARK_WORKER_MEMORY=4G. Would anyone help to give me hint how to resolve this issue? I have not give any hint after google search. *# bin/spark-shellSpark assembly has been built with Hive, including Datanucleus jars on classpath15/02/11 12:21:39 INFO SecurityManager: Changing view acls to: root,15/02/11 12:21:39 INFO SecurityManager: Changing modify acls to: root,15/02/11 12:21:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, )15/02/11 12:21:39 INFO HttpServer: Starting HTTP Server15/02/11 12:21:39 INFO Utils: Successfully started service 'HTTP class server' on port 28968.Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.6.0_24)Type in expressions to have them evaluated.Type :help for more information.15/02/11 12:21:43 INFO SecurityManager: Changing view acls to: root,15/02/11 12:21:43 INFO SecurityManager: Changing modify acls to: root,15/02/11 12:21:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, )15/02/11 12:21:44 INFO Slf4jLogger: Slf4jLogger started15/02/11 12:21:44 INFO Remoting: Starting remoting15/02/11 12:21:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xpan-biqa1:6862]15/02/11 12:21:44 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@xpan-biqa1:6862]15/02/11 12:21:44 INFO Utils: Successfully started service 'sparkDriver' on port 6862.15/02/11 12:21:44 INFO SparkEnv: Registering MapOutputTracker15/02/11 12:21:44 INFO SparkEnv: Registering BlockManagerMaster15/02/11 12:21:44 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150211122144-ed2615/02/11 12:21:44 INFO Utils: Successfully started service 'Connection manager for block manager' on port 40502.15/02/11 12:21:44 INFO ConnectionManager: Bound socket to port 40502 with id = ConnectionManagerId(xpan-biqa1,40502)15/02/11 12:21:44 INFO MemoryStore: MemoryStore started with capacity 265.0 MB15/02/11 12:21:44 INFO BlockManagerMaster: Trying to register BlockManager15/02/11 12:21:44 INFO BlockManagerMasterActor: Registering block manager xpan-biqa1:40502 with 265.0 MB RAM15/02/11 12:21:44 INFO BlockManagerMaster: Registered BlockManager15/02/11 12:21:44 INFO HttpFileServer: HTTP File server directory is /tmp/spark-0a80ce6b-6a05-4163-a97d-07753f627ec815/02/11 12:21:44 INFO HttpServer: Starting HTTP Server15/02/11 12:21:44 INFO Utils: Successfully started service 'HTTP file server' on port 25939.15/02/11 12:21:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.15/02/11 12:21:44 INFO SparkUI: Started SparkUI at http://xpan-biqa1:4040 http://xpan-biqa1:404015/02/11 12:21:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/02/11 12:21:46 INFO EventLoggingListener: Logging events to hdfs://xpan-biqa1:7020/spark/spark-shell-142362850543115/02/11 12:21:46 INFO AppClient$ClientActor: Connecting to master spark://xpan-biqa1:7077...15/02/11 12:21:46 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.015/02/11 12:21:46 INFO SparkILoop: Created spark context..Spark context available as sc.scala 15/02/11 12:22:06 INFO AppClient$ClientActor: Connecting to master spark://xpan-biqa1:7077...scala sc.parallelize(1 to 1000).count()15/02/11 12:22:24 INFO SparkContext: Starting job: count at console:1315/02/11 12:22:24 INFO DAGScheduler: Got job 0 (count at console:13) with 2 output partitions (allowLocal=false)15/02/11 12:22:24 INFO DAGScheduler: Final stage: Stage 0(count at console:13)15/02/11 12:22:24 INFO DAGScheduler: Parents of final stage: List()15/02/11 12:22:24 INFO DAGScheduler: Missing parents: List()15/02/11 12:22:24 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at console:13), which has no missing parents15/02/11 12:22:24 INFO MemoryStore: ensureFreeSpace(1088) called with curMem=0, maxMem=27784249315/02/11 12:22:24 INFO
Re: [spark sql] add file doesn't work
[Additional info] I was using the master branch of 9 Feb 2015, the latest commit in git info is: commit 0793ee1b4dea1f4b0df749e8ad7c1ab70b512faf Author: Sandy Ryza sa...@cloudera.com Date: Mon Feb 9 10:12:12 2015 + SPARK-2149. [MLLIB] Univariate kernel density estimation Author: Sandy Ryza sa...@cloudera.com Closes #1093 from sryza/sandy-spark-2149 and squashes the following com 5f06b33 [Sandy Ryza] More review comments 0f73060 [Sandy Ryza] Respond to Sean's review comments 0dfa005 [Sandy Ryza] SPARK-2149. Univariate kernel density estimation best regards, zhenhua From: wangzhenhua (G)mailto:wangzhen...@huawei.com Date: 2015-02-10 20:39 To: usermailto:user@spark.apache.org Subject: [spark sql] add file doesn't work Hi all, I'm testing the spark sql module, and I found a problem with one of the test cases. I think the main problem is that the add file command in spark sql (hive?) doesn't work. since conducting an additional test by directly giving the path to the file offers the right answer. The tests are as follows: 1. Original test case: set hive.map.aggr.hash.percentmemory = 0.3; set hive.mapred.local.mem = 384; add file ../../data/scripts/dumpdata_script.py; select count(distinct subq.key) from (FROM src MAP src.key USING 'python dumpdata_script.py' AS key WHERE src.key = 10) subq; returned result: 0 2. Additional test: replace the last sentence as below (adding a path to the file): select count(distinct subq.key) from (FROM src MAP src.key USING 'python ../../data/scripts/dumpdata_script.py' AS key WHERE src.key = 10) subq; returned result: 122 best regards, zhenhua
Re: SparkSQL + Tableau Connector
Hi Silvio, So the Initial SQL is executing now, I did not have the * added that and it worked fine. FWIW the * is not needed for the parquet files: create temporary table test using org.apache.spark.sql.json options (path '/data/out/*') ; cache table test; select count(1) from test; Unfortunately while the table is created and cached, i can see the statements being executed in the log file of spark, it is not associated with any schema at least that is being picked up by the Tableau Connector. So unless there is someway to associate it with a given schema I think I'm at a dead end on this one. Anything I may be missing here? Thanks for the help, it is much appreciated. I will give Arush suggestion a try tomorrow. -Todd On Tue, Feb 10, 2015 at 7:24 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Todd, I just tried it in bin/spark-sql shell. I created a folder *json *and just put 2 copies of the same people.json file This is what I ran: spark-sql create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/json/*') ; Time taken: 0.34 seconds spark-sql select * from people; NULLMichael 30 Andy 19 Justin NULLMichael 30 Andy 19 Justin Time taken: 0.576 seconds From: Todd Nist Date: Tuesday, February 10, 2015 at 6:49 PM To: Silvio Fiorito Cc: user@spark.apache.org Subject: Re: SparkSQL + Tableau Connector Hi Silvio, Ah, I like that, there is a section in Tableau for Initial SQL to be executed upon connecting this would fit well there. I guess I will need to issue a collect(), coalesce(1,true).saveAsTextFile(...) or use repartition(1), as the file currently is being broken into multiple parts. While this works in the spark-shell: val test = sqlContext.jsonFile(/data/out/“) // returs all parts back as one It seems to fail in just spark-sql: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test with: [Simba][SparkODBC] (35) Error from Spark: error code: '0' error message: 'org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test'. Initial SQL Error. Check that the syntax is correct and that you have access privileges to the requested database. Thanks again for the suggestion and I will give work with it a bit more tomorrow. -Todd On Tue, Feb 10, 2015 at 5:48 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Todd, What you could do is run some SparkSQL commands immediately after the Thrift server starts up. Or does Tableau have some init SQL commands you could run? You can actually load data using SQL, such as: create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/people.json’) cache table people create temporary table users using org.apache.spark.sql.parquet options (path 'examples/src/main/resources/users.parquet’) cache table users From: Todd Nist Date: Tuesday, February 10, 2015 at 3:03 PM To: user@spark.apache.org Subject: SparkSQL + Tableau Connector Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?
I'm running a small job on a cluster with 15G of mem and 8G of disk per machine. The job always get into a deadlock where the last error message is: java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86) at org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) By the time it happens the shuffle write size is 0.0B and input size is 3.4MB. I wonder what operation could quickly eat up the entire 5G free disk space. In addition, The storage level of the entire job is confined to MEMORY_ONLY_SERIALIZED and checkpointing is completely disabled.
Something about the cluster upgrade
Hi, Now I need to upgrade my spark cluster from version 1.1.0 to 1.2.1 , if there is convenient way to do this. something like ./start-dfs.sh (http://start-dfs.sh) -upgrade in hadoop Best Wishs THX -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
RE: hadoopConfiguration for StreamingContext
It looks like this is related to the underlying Hadoop configuration. Try to deploy the Hadoop configuration with your job with --files and --driver-class-path, or to the default /etc/hadoop/conf core-site.xml. If that is not an option (depending on how your Hadoop cluster is setup), then hard code the value vie -Dkey=value to see if it works. The downside is your credentials are exposed in plaintext in the java commands. or by defining it in spark-defaults.conf property spark.executor.extraJavaOptions e.g.s3n spark.executor.extraJavaOptions -Dfs.s3n.awsAccessKeyId=X -Dfs.s3n.awsSecretAccessKey= s3spark.executor.extraJavaOptions -Dfs.s3.awsAccessKeyId=X -Dfs.s3.awsSecretAccessKey= Hope this works. Or embed them in the s3n path. Not good security practice though. From: mslimo...@gmail.com Date: Tue, 10 Feb 2015 10:57:47 -0500 Subject: Re: hadoopConfiguration for StreamingContext To: ak...@sigmoidanalytics.com CC: u...@spark.incubator.apache.org Thanks, Akhil. I had high hopes for #2, but tried all and no luck. I was looking at the source and found something interesting. The Stack Trace (below) directs me to FileInputDStream.scala (line 141). This is version 1.1.1, btw. Line 141 has: private def fs: FileSystem = { if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) fs_ } So it looks to me like it doesn't make any attempt to use a configured HadoopConf. Here is the StackTrace: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy5.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:141) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) ... On Tue, Feb 10, 2015 at 10:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try the following: 1. Set the access key and secret key in the sparkContext: ssc.sparkContext.hadoopConfiguration.set(AWS_ACCESS_KEY_ID,yourAccessKey) ssc.sparkContext.hadoopConfiguration.set(AWS_SECRET_ACCESS_KEY,yourSecretKey) 2. Set the access key and secret key in the environment before startingyour application: export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret 3. Set the access key and secret key inside the hadoop configurations val hadoopConf=ssc.sparkContext.hadoopConfiguration;hadoopConf.set(fs.s3.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem)hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey)hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 4. You can also try: val stream = ssc.textFileStream(s3n://yourAccessKey:yourSecretKey@yourBucket/path/)ThanksBest Regards On Tue, Feb 10, 2015 at 8:27 PM, Marc Limotte mslimo...@gmail.com wrote: I see that StreamingContext has a hadoopConfiguration() method, which can be used like this sample I found: sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); But StreamingContext doesn't have the same thing. I want to use a StreamingContext with s3n: text file input, but can't find a way to set the AWS credentials. I also tried (with no success): adding the properties to conf/spark-defaults.conf$HADOOP_HOME/conf/hdfs-site.xmlENV variablesEmbedded as user:password in s3n://user:password@... (w/ url encoding)Setting the conf as above on a new SparkContext and passing that the StreamingContext constructor: StreamingContext(sparkContext: SparkContext, batchDuration: Duration)Can someone point me in
Re: Can we execute create table and load data commands against Hive inside HiveContext?
org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory was introduced in Hive 0.14 and Spark SQL only supports Hive 0.12 and 0.13.1. Can you change the setting of hive.security.authorization.manager to someone accepted by 0.12 or 0.13.1? On Thu, Feb 5, 2015 at 11:40 PM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, I am playing with the following example code: public class SparkTest { public static void main(String[] args){ String appName= This is a test application; String master=spark://lix1.bh.com:7077; SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaHiveContext sqlCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(sc); //sqlCtx.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)); //sqlCtx.sql(LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src); // Queries are expressed in HiveQL. ListRow rows = sqlCtx.sql(FROM src SELECT key, value).collect(); //ListRow rows = sqlCtx.sql(show tables).collect(); System.out.print(I got + rows.size() + rows \r\n); sc.close(); }} With the create table and load data commands commented out, the query command can be executed successfully, but I come to ClassNotFoundExceptions if these two commands are executed inside HiveContext, even with different error messages, The create table command will cause the following: Exception in thread main org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute( NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult( NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute( NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute( SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd( SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.api.java.JavaSchemaRDD.init( JavaSchemaRDD.scala:42) at org.apache.spark.sql.hive.api.java.JavaHiveContext.sql( JavaHiveContext.scala:37) at com.blackhorse.SparkTest.main(SparkTest.java:24) [delete Spark temp dirs] DEBUG org.apache.spark.util.Utils - Shutdown hook called [delete Spark local dirs] DEBUG org.apache.spark.storage.DiskBlockManager - Shutdown hook called The load data command will cause the following: Exception in thread main org.apache.spark.sql.execution.QueryExecutionException: FAILED: RuntimeException org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.api.java.JavaSchemaRDD.init(JavaSchemaRDD.scala:42) at org.apache.spark.sql.hive.api.java.JavaHiveContext.sql(JavaHiveContext.scala:37) at com.blackhorse.SparkTest.main(SparkTest.java:25) [delete Spark local dirs] DEBUG org.apache.spark.storage.DiskBlockManager - Shutdown hook called [delete Spark temp dirs] DEBUG org.apache.spark.util.Utils - Shutdown hook called
Re: Resource allocation in yarn-cluster mode
Hi Zsolt, spark.executor.memory, spark.executor.cores, and spark.executor.instances are only honored when launching through spark-submit. Marcelo is working on a Spark launcher (SPARK-4924) that will enable using these programmatically. That's correct that the error comes up when yarn.scheduler.maximum-allocation-mb is exceeded. The reason it doesn't just use a smaller amount of memory is because it could be surprising to the user to find out they're silently getting less memory than they requested. Also, I don't think YARN exposes this up front so Spark has no way to check. -Sandy On Tue, Feb 10, 2015 at 8:38 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: One more question: Is there reason why Spark throws an error when requesting too much memory instead of capping it to the maximum value (as YARN would do by default)? Thanks! 2015-02-10 17:32 GMT+01:00 Zsolt Tóth toth.zsolt@gmail.com: Hi, I'm using Spark in yarn-cluster mode and submit the jobs programmatically from the client in Java. I ran into a few issues when tried to set the resource allocation properties. 1. It looks like setting spark.executor.memory, spark.executor.cores and spark.executor.instances have no effect because ClientArguments checks only for the command line arguments (--num-executors, --executors cores, etc.). Is it possible to use the properties in yarn-cluster mode instead of the command line arguments? 2. My nodes have 5GB memory but when I set --executor-memory to 4g (overhead 384m), I get the exception that the required executor memory is above the max threshold of this cluster. It looks like this threshold is the value of the yarn.scheduler.maximum-allocation-mb property. Is that correct? Thanks, Zsolt
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO Remoting: Starting remoting 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO SparkEnv:
Re: Exception when trying to use EShadoop connector and writing rdd to ES
First off, I'd recommend using the latest es-hadoop beta (2.1.0.Beta3) or even better, the dev build [1]. Second, using the native Java/Scala API [2] since the configuration and performance are both easier. Third, when you are using JSON input, tell es-hadoop/spark that. the connector can work with both objects (the default) or raw json. It so just happens, the es-hadoop connector describes the above here [3] :). Hope this helps, [1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/install.html#download-dev [2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-native [3] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-json On 2/10/15 6:58 PM, shahid ashraf wrote: thanks costin i m grouping data together based on id in json and rdd contains rdd = (1,{'SOURCES': [{n no. of key/valu}],}),(2,{'SOURCES': [{n no. of key/valu}],}),(3,{'SOURCES': [{n no. of key/valu}],}),(4,{'SOURCES': [{n no. of key/valu}],}) rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass=org.elasticsearch.hadoop.mr.EsOutputFormat, keyClass=org.apache.hadoop.io.NullWritable, valueClass=org.elasticsearch.hadoop.mr.LinkedMapWritable, conf={ es.nodes : localhost, es.port : 9200, es.resource : shahid/hcp_id }) spark-1.1.0-bin-hadoop1 java version 1.7.0_71 elasticsearch-1.4.2 elasticsearch-hadoop-2.1.0.Beta2.jar On Tue, Feb 10, 2015 at 10:05 PM, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com wrote: Sorry but there's too little information in this email to make any type of assesment. Can you please describe what you are trying to do, what version of Elastic and es-spark are you suing and potentially post a snippet of code? What does your RDD contain? On 2/10/15 6:05 PM, shahid wrote: INFO scheduler.TaskSetManager: Starting task 2.1 in stage 2.0 (TID 9, ip-10-80-98-118.ec2.internal, PROCESS_LOCAL, 1025 bytes) 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.__SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1] 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 2.0 (TID 10, ip-10-80-15-145.ec2.internal, PROCESS_LOCAL, 1025 bytes) -- View this message in context: http://apache-spark-user-list.__1001560.n3.nabble.com/__Exception-when-trying-to-use-__EShadoop-connector-and-__writing-rdd-to-ES-tp21579.html http://apache-spark-user-list.1001560.n3.nabble.com/Exception-when-trying-to-use-EShadoop-connector-and-writing-rdd-to-ES-tp21579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Costin -- with Regards Shahid Ashraf -- Costin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception when trying to use EShadoop connector and writing rdd to ES
hi costin i upgraded the es hadoop connector , and at this point i can't use scala, but still getting same error On Tue, Feb 10, 2015 at 10:34 PM, Costin Leau costin.l...@gmail.com wrote: Hi shahid, I've sent the reply to the group - for some reason I replied to your address instead of the mailing list. Let's continue the discussion there. Cheers, On 2/10/15 6:58 PM, shahid ashraf wrote: thanks costin i m grouping data together based on id in json and rdd contains rdd = (1,{'SOURCES': [{n no. of key/valu}],}),(2,{'SOURCES': [{n no. of key/valu}],}),(3,{'SOURCES': [{n no. of key/valu}],}),(4,{'SOURCES': [{n no. of key/valu}],}) rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass=org.elasticsearch.hadoop.mr. EsOutputFormat, keyClass=org.apache.hadoop.io.NullWritable, valueClass=org.elasticsearch.hadoop.mr.LinkedMapWritable, conf={ es.nodes : localhost, es.port : 9200, es.resource : shahid/hcp_id }) spark-1.1.0-bin-hadoop1 java version 1.7.0_71 elasticsearch-1.4.2 elasticsearch-hadoop-2.1.0.Beta2.jar On Tue, Feb 10, 2015 at 10:05 PM, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com wrote: Sorry but there's too little information in this email to make any type of assesment. Can you please describe what you are trying to do, what version of Elastic and es-spark are you suing and potentially post a snippet of code? What does your RDD contain? On 2/10/15 6:05 PM, shahid wrote: INFO scheduler.TaskSetManager: Starting task 2.1 in stage 2.0 (TID 9, ip-10-80-98-118.ec2.internal, PROCESS_LOCAL, 1025 bytes) 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.__SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1] 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 2.0 (TID 10, ip-10-80-15-145.ec2.internal, PROCESS_LOCAL, 1025 bytes) -- View this message in context: http://apache-spark-user-list.__1001560.n3.nabble.com/__ Exception-when-trying-to-use-__EShadoop-connector-and-__ writing-rdd-to-ES-tp21579.html http://apache-spark-user-list.1001560.n3.nabble.com/ Exception-when-trying-to-use-EShadoop-connector-and- writing-rdd-to-ES-tp21579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Costin -- with Regards Shahid Ashraf -- Costin -- with Regards Shahid Ashraf
Re: Resource allocation in yarn-cluster mode
One more question: Is there reason why Spark throws an error when requesting too much memory instead of capping it to the maximum value (as YARN would do by default)? Thanks! 2015-02-10 17:32 GMT+01:00 Zsolt Tóth toth.zsolt@gmail.com: Hi, I'm using Spark in yarn-cluster mode and submit the jobs programmatically from the client in Java. I ran into a few issues when tried to set the resource allocation properties. 1. It looks like setting spark.executor.memory, spark.executor.cores and spark.executor.instances have no effect because ClientArguments checks only for the command line arguments (--num-executors, --executors cores, etc.). Is it possible to use the properties in yarn-cluster mode instead of the command line arguments? 2. My nodes have 5GB memory but when I set --executor-memory to 4g (overhead 384m), I get the exception that the required executor memory is above the max threshold of this cluster. It looks like this threshold is the value of the yarn.scheduler.maximum-allocation-mb property. Is that correct? Thanks, Zsolt
does updateStateByKey return Seq() ordered?
I was looking at updateStateByKey documentation, It passes in a values Seq which contains values that have the same key. I would like to know if there is any ordering to these values. My feeling is that there is no ordering, but maybe it does preserve RDD ordering. Example: RDD[ (a,2), (a,3), (a,1)] Can values be unordered like Seq ((a,3),(a,1),(a,2) ) ? -Adrian
Re: How to broadcast a variable read from a file in yarn-cluster mode?
I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO Remoting: Starting remoting 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker 15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at /hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1 15/02/10 12:06:20 INFO
Re: OutofMemoryError: Java heap space
Since the stacktrace shows kryo is being used, maybe, you could also try increasing spark.kryoserializer.buffer.max.mb. Hope this help. Kelvin On Tue, Feb 10, 2015 at 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at
Resource allocation in yarn-cluster mode
Hi, I'm using Spark in yarn-cluster mode and submit the jobs programmatically from the client in Java. I ran into a few issues when tried to set the resource allocation properties. 1. It looks like setting spark.executor.memory, spark.executor.cores and spark.executor.instances have no effect because ClientArguments checks only for the command line arguments (--num-executors, --executors cores, etc.). Is it possible to use the properties in yarn-cluster mode instead of the command line arguments? 2. My nodes have 5GB memory but when I set --executor-memory to 4g (overhead 384m), I get the exception that the required executor memory is above the max threshold of this cluster. It looks like this threshold is the value of the yarn.scheduler.maximum-allocation-mb property. Is that correct? Thanks, Zsolt
Re: Spark SQL - Column name including a colon in a SELECT clause
Can you try using backticks to quote the field name? Like `f:price`. On Tue, Feb 10, 2015 at 5:47 AM, presence2001 neil.andra...@thefilter.com wrote: Hi list, I have some data with a field name of f:price (it's actually part of a JSON structure loaded from ElasticSearch via elasticsearch-hadoop connector, but I don't think that's significant here). I'm struggling to figure out how to express that in a Spark SQL SELECT statement without generating an error (and haven't been able to find any similar examples in the documentation). val productsRdd = sqlContext.sql(SELECT Locales.Invariant.Metadata.item.f:price FROM products LIMIT 10) gives me the following error... java.lang.RuntimeException: [1.41] failure: ``UNION'' expected but `:' found Changing the column name is one option, but I have other systems depending on this right now so it's not a trivial exercise. :( I'm using Spark 1.2. Thanks in advance for any advice / help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Column-name-including-a-colon-in-a-SELECT-clause-tp21576.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[POWERED BY] Radius Intelligence
Also long due given our usage of Spark .. Radius Intelligence: URL: radius.com Description: Spark, MLLib Using Scala, Spark and MLLib for Radius Marketing and Sales intelligence platform including data aggregation, data processing, data clustering, data analysis and predictive modeling of all US businesses.
Re: Executor Lost with StorageLevel.MEMORY_AND_DISK_SER
Unfortunately no. I just removed the persist statements to get the job to run, but now it sometimes fails with Job aborted due to stage failure: Task 162 in stage 2.1 failed 4 times, most recent failure: Lost task 162.3 in stage 2.1 (TID 1105, xxx.compute.internal): java.io.FileNotFoundException: /tmp/spark-local-20150210030009-b4f1/3f/shuffle_4_655_49 (No space left on device) Even though there’s plenty of disk space left. On 10.02.2015, at 00:09, Muttineni, Vinay vmuttin...@ebay.com wrote: Hi Marius, Did you find a solution to this problem? I get the same error. Thanks, Vinay -Original Message- From: Marius Soutier [mailto:mps@gmail.com] Sent: Monday, February 09, 2015 2:19 AM To: user Subject: Executor Lost with StorageLevel.MEMORY_AND_DISK_SER Hi there, I'm trying to improve performance on a job that has GC troubles and takes longer to compute simply because it has to recompute failed tasks. After deferring object creation as much as possible, I'm now trying to improve memory usage with StorageLevel.MEMORY_AND_DISK_SER and a custom KryoRegistrator that registers all used classes. This works fine both in unit tests and on a local cluster (i.e. master and worker on my dev machine). On the production cluster this fails without any error message except: Job aborted due to stage failure: Task 10 in stage 2.0 failed 4 times, most recent failure: Lost task 10.3 in stage 2.0 (TID 20, xxx.compute.internal): ExecutorLostFailure (executor lost) Driver stacktrace: Is there any way to understand what's going on? The logs don't show anything. I'm using Spark 1.1.1. Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ImportError: No module named pyspark, when running pi.py
Agree. PySpark would call spark-submit. Check out the command line there. --- Original Message --- From: Mohit Singh mohit1...@gmail.com Sent: February 9, 2015 11:26 PM To: Ashish Kumar ashish.ku...@innovaccer.com Cc: user@spark.apache.org Subject: Re: ImportError: No module named pyspark, when running pi.py I think you have to run that using $SPARK_HOME/bin/pyspark /path/to/pi.py instead of normal python pi.py On Mon, Feb 9, 2015 at 11:22 PM, Ashish Kumar ashish.ku...@innovaccer.com wrote: *Command:* sudo python ./examples/src/main/python/pi.py *Error:* Traceback (most recent call last): File ./examples/src/main/python/pi.py, line 22, in module from pyspark import SparkContext ImportError: No module named pyspark -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Re: OutofMemoryError: Java heap space
You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at
Re: Shuffle write increases in spark 1.2
Hello, as the original message never got accepted to the mailinglist, I quote it here completely: Kevin Jung wrote Hi all, The size of shuffle write showing in spark web UI is much different when I execute same spark job on same input data(100GB) in both spark 1.1 and spark 1.2. At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 writes larger file than spark 1.1. Can anyone tell me why this happens? Thanks Kevin I'm experiencing the same thing with my job and that's what I tested: * Spark 1.2.0 with Sort-based Shuffle * Spark 1.2.0 with Hash-based Shuffle * Spark 1.2.1 with Sort-based Shuffle All three combinations show the same behaviour, which contrasts from Spark 1.1.0. In Spark 1.1.0, my job runs for about an hour, in Spark 1.2.x it runs for almost four hours. Configuration is identical otherwise - I only added org.apache.spark.scheduler.CompressedMapStatus to the Kryo registrator for Spark 1.2.0 to cope with https://issues.apache.org/jira/browse/SPARK-5102. As a consequence (I think, but causality might be different) I see lots and lots of disk spills. I cannot provide a small test case, but maybe the log entries for a single worker thread can help someone investigate on this. (See below.) I will also open up an issue, if nobody stops me by providing an answer ;) Any help will be greatly appreciated, because otherwise I'm stuck with Spark 1.1.0, as quadrupling runtime is not an option. Sincerely, Chris 2015-02-09T14:06:06.328+01:00 INFOorg.apache.spark.executor.Executor Running task 9.0 in stage 18.0 (TID 300)Executor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFOorg.apache.spark.CacheManager Partition rdd_35_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 10 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:07.396+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(2582904) called with curMem=300174944, maxMe... Executor task launch worker-18 2015-02-09T14:06:07.397+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_35_9 stored as bytes in memory (estimated size 2.5... Executor task launch worker-18 2015-02-09T14:06:07.398+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_35_9 Executor task launch worker-18 2015-02-09T14:06:07.399+01:00 INFOorg.apache.spark.CacheManager Partition rdd_38_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:07.399+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 10 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:07.400+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:07.567+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(944848) called with curMem=302757848, maxMem... Executor task launch worker-18 2015-02-09T14:06:07.568+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_38_9 stored as values in memory (estimated size 92... Executor task launch worker-18 2015-02-09T14:06:07.569+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_38_9 Executor task launch worker-18 2015-02-09T14:06:07.573+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 34 non-empty blocks out of 50 blocks Executor task launch worker-18 2015-02-09T14:06:07.573+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 1 ms Executor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFOorg.apache.spark.CacheManager Partition rdd_41_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 3 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:38.945+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(0) called with curMem=307529127, maxMem=9261... Executor task launch worker-18 2015-02-09T14:06:38.945+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_41_9 stored as bytes in memory (estimated size 0.0... Executor task launch worker-18 2015-02-09T14:06:38.946+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_41_9 Executor task launch worker-18
Re: Shuffle write increases in spark 1.2
Hello, as the original message from Kevin Jung never got accepted to the mailinglist, I quote it here completely: Kevin Jung wrote Hi all, The size of shuffle write showing in spark web UI is much different when I execute same spark job on same input data(100GB) in both spark 1.1 and spark 1.2. At the same sortBy stage, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 writes larger file than spark 1.1. Can anyone tell me why this happens? Thanks Kevin I'm experiencing the same thing with my job and that's what I tested: * Spark 1.2.0 with Sort-based Shuffle * Spark 1.2.0 with Hash-based Shuffle * Spark 1.2.1 with Sort-based Shuffle All three combinations show the same behaviour, which contrasts from Spark 1.1.0. In Spark 1.1.0, my job runs for about an hour, in Spark 1.2.x it runs for almost four hours. Configuration is identical otherwise - I only added org.apache.spark.scheduler.CompressedMapStatus to the Kryo registrator for Spark 1.2.0 to cope with https://issues.apache.org/jira/browse/SPARK-5102. As a consequence (I think, but causality might be different) I see lots and lots of disk spills. I cannot provide a small test case, but maybe the log entries for a single worker thread can help someone investigate on this. (See below.) I also opened an issue on this, see https://issues.apache.org/jira/browse/SPARK-5715 Any help will be greatly appreciated, because otherwise I'm stuck with Spark 1.1.0, as quadrupling runtime is not an option. Sincerely, Chris 2015-02-09T14:06:06.328+01:00 INFOorg.apache.spark.executor.Executor Running task 9.0 in stage 18.0 (TID 300)Executor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFOorg.apache.spark.CacheManager Partition rdd_35_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 10 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:06.351+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:07.396+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(2582904) called with curMem=300174944, maxMe... Executor task launch worker-18 2015-02-09T14:06:07.397+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_35_9 stored as bytes in memory (estimated size 2.5... Executor task launch worker-18 2015-02-09T14:06:07.398+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_35_9 Executor task launch worker-18 2015-02-09T14:06:07.399+01:00 INFOorg.apache.spark.CacheManager Partition rdd_38_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:07.399+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 10 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:07.400+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:07.567+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(944848) called with curMem=302757848, maxMem... Executor task launch worker-18 2015-02-09T14:06:07.568+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_38_9 stored as values in memory (estimated size 92... Executor task launch worker-18 2015-02-09T14:06:07.569+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_38_9 Executor task launch worker-18 2015-02-09T14:06:07.573+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 34 non-empty blocks out of 50 blocks Executor task launch worker-18 2015-02-09T14:06:07.573+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 1 ms Executor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFOorg.apache.spark.CacheManager Partition rdd_41_9 not found, computing itExecutor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorGetting 3 non-empty blocks out of 10 blocks Executor task launch worker-18 2015-02-09T14:06:38.931+01:00 INFO org.apache.spark.storage.ShuffleBlockFetcherIteratorStarted 0 remote fetches in 0 ms Executor task launch worker-18 2015-02-09T14:06:38.945+01:00 INFOorg.apache.spark.storage.MemoryStore ensureFreeSpace(0) called with curMem=307529127, maxMem=9261... Executor task launch worker-18 2015-02-09T14:06:38.945+01:00 INFOorg.apache.spark.storage.MemoryStore Block rdd_41_9 stored as bytes in memory (estimated size 0.0... Executor task launch worker-18 2015-02-09T14:06:38.946+01:00 INFO org.apache.spark.storage.BlockManagerMaster Updated info of block rdd_41_9
Kafka Version Update 0.8.2 status?
When can we expect the latest kafka and scala 2.11 support in spark streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Version-Update-0-8-2-status-tp21573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to efficiently utilize all cores?
Hi, I have a cluster setup with three slaves, 4 cores each(12 cores in total). When I try to run multiple applications, using 4 cores each, only the first application is running(with 2,1,1 cores used in corresponding slaves). Every other application is going to WAIT state. Following the solution provided here http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-Core-Allocation-td7188.html I set the parameter spark.deploy.spreadout to false. But the problem is not solved. Any suggestion in this regard is welcome. Thanks in advance Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-utilize-all-cores-tp21569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom streaming receiver slow on YARN
Not quiet sure, but one assumption would be that you are not having sufficient memory to hold that much of data and the process gets busy in cleaning the garbage and it could be the reason it works when you set MEMORY_AND_DISK_SER_2. Thanks Best Regards On Mon, Feb 9, 2015 at 8:38 PM, Jong Wook Kim jongw...@nyu.edu wrote: replying to my own thread; I realized that this only happens when the replication level is 1. Regardless of whether setting memory_only or disk or deserialized, I had to make the replication level = 2 to make the streaming work properly on YARN. I still don't get it why, because intuitively less replication should imply faster computation, and testing on a cloudera VM everything worked fine on YARN. If I am missing something important, please let me know. I am going to settle down to '..._2' variants for now. Jong Wook -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-streaming-receiver-slow-on-YARN-tp21544p21553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map distribuited matrix (rowMatrix)
I have a rowMatrix x and I would like to apply a function to each element of x. I was thinking something likex map(u=exp(-u*u)) . How can I do something like that? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-distribuited-matrix-rowMatrix-tp21571.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError: Java heap space
Hi Akhil, Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on a cluster which has 20 machines. And, the property of each vertex in graph is a hash map, of which size will increase dramatically during pregel supersteps. so, it seems to suffer from high GC? Best, Yifan LI On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at
Re: How to efficiently utilize all cores?
You can look at http://spark.apache.org/docs/1.2.0/job-scheduling.html I would go with mesos http://spark.apache.org/docs/1.2.0/running-on-mesos.html Thanks Best Regards On Tue, Feb 10, 2015 at 2:59 PM, matha.harika matha.har...@gmail.com wrote: Hi, I have a cluster setup with three slaves, 4 cores each(12 cores in total). When I try to run multiple applications, using 4 cores each, only the first application is running(with 2,1,1 cores used in corresponding slaves). Every other application is going to WAIT state. Following the solution provided here http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-Core-Allocation-td7188.html I set the parameter spark.deploy.spreadout to false. But the problem is not solved. Any suggestion in this regard is welcome. Thanks in advance Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-utilize-all-cores-tp21569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark nature of file split
Have you been able to confirm this behaviour since posting? Have you tried this out on multiple workers and viewed their memory consumption? I'm new to Spark and don't have a cluster to play with at present, and want to do similar loading from NFS files. My understanding is that calls to SparkContext.textFiles(filename.csv, 5) in this example will use 5 partitions and this would mean that 5 workers could read the same CSV file simultaneously, but they would each read a different offset of the file (i.e. they don't all read the entire file, just 1/5th of it). dbakumar wrote I am new to Spark and understanding RDD. i have file of 30GB (csv NFS mounted) and 1 master node and 3 worker node. does it each Spark worker load 30GB file OR spark allocate partition automatically and each worker load only allocated partition to memory? I am also wondering how best to group the data once loaded because, in my case, I will want the RDD partitioned by a business key, which will require reshuffling AFAIK. See my question: http://stackoverflow.com/questions/28415258/apache-spark-loading-csv-files-from-nfs-and-partitioning-the-data -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-nature-of-file-split-tp21445p21574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError: Java heap space
Yes, I have read it, and am trying to find some way to do that… Thanks :) Best, Yifan LI On 10 Feb 2015, at 12:06, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a chance to look at this doc http://spark.apache.org/docs/1.2.0/tuning.html http://spark.apache.org/docs/1.2.0/tuning.html Thanks Best Regards On Tue, Feb 10, 2015 at 4:13 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi Akhil, Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on a cluster which has 20 machines. And, the property of each vertex in graph is a hash map, of which size will increase dramatically during pregel supersteps. so, it seems to suffer from high GC? Best, Yifan LI On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
Re: OutofMemoryError: Java heap space
Did you have a chance to look at this doc http://spark.apache.org/docs/1.2.0/tuning.html Thanks Best Regards On Tue, Feb 10, 2015 at 4:13 PM, Yifan LI iamyifa...@gmail.com wrote: Hi Akhil, Excuse me, I am trying a random-walk algorithm over a not that large graph(~1GB raw dataset, including ~5million vertices and ~60million edges) on a cluster which has 20 machines. And, the property of each vertex in graph is a hash map, of which size will increase dramatically during pregel supersteps. so, it seems to suffer from high GC? Best, Yifan LI On 10 Feb 2015, at 10:26, Akhil Das ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at
Re: Kafka Version Update 0.8.2 status?
I believe Kafka 0.8.2 is imminent for master / 1.4.0 -- see SPARK-2808. Anecdotally I have used Spark 1.2 with Kafka 0.8.2 without problem already. On Feb 10, 2015 10:53 AM, critikaled isasmani@gmail.com wrote: When can we expect the latest kafka and scala 2.11 support in spark streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Version-Update-0-8-2-status-tp21573.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL - Column name including a colon in a SELECT clause
Hi list, I have some data with a field name of f:price (it's actually part of a JSON structure loaded from ElasticSearch via elasticsearch-hadoop connector, but I don't think that's significant here). I'm struggling to figure out how to express that in a Spark SQL SELECT statement without generating an error (and haven't been able to find any similar examples in the documentation). val productsRdd = sqlContext.sql(SELECT Locales.Invariant.Metadata.item.f:price FROM products LIMIT 10) gives me the following error... java.lang.RuntimeException: [1.41] failure: ``UNION'' expected but `:' found Changing the column name is one option, but I have other systems depending on this right now so it's not a trivial exercise. :( I'm using Spark 1.2. Thanks in advance for any advice / help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Column-name-including-a-colon-in-a-SELECT-clause-tp21576.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have made a sample app below to mimic the problem and put all classes into one file, it is also attached to this email. To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name) .set(SparkConfig.SparkMesosCoarse, Constants.True) .set(SparkConfig.SparkCleanerTtl, 300)
Re: Re: Exception when trying to use EShadoop connector and writing rdd to ES
What's the signature of your RDD? It looks to be a List which can't be mapped automatically to a document - you are probably thinking of a tuple or better yet a PairRDD. Convert your RDDList to a PairRDD and use that instead. This is a guess - a gist with a simple test/code would make it easier to diagnose what's going on. On 2/10/15 7:24 PM, shahid ashraf wrote: hi costin i upgraded the es hadoop connector , and at this point i can't use scala, but still getting same error On Tue, Feb 10, 2015 at 10:34 PM, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com wrote: Hi shahid, I've sent the reply to the group - for some reason I replied to your address instead of the mailing list. Let's continue the discussion there. Cheers, On 2/10/15 6:58 PM, shahid ashraf wrote: thanks costin i m grouping data together based on id in json and rdd contains rdd = (1,{'SOURCES': [{n no. of key/valu}],}),(2,{'SOURCES': [{n no. of key/valu}],}),(3,{'SOURCES': [{n no. of key/valu}],}),(4,{'SOURCES': [{n no. of key/valu}],}) rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass=org.__elasticsearch.hadoop.mr http://elasticsearch.hadoop.mr.__EsOutputFormat, keyClass=org.apache.hadoop.__io.NullWritable, valueClass=org.elasticsearch.__hadoop.mr.LinkedMapWritable, conf={ es.nodes : localhost, es.port : 9200, es.resource : shahid/hcp_id }) spark-1.1.0-bin-hadoop1 java version 1.7.0_71 elasticsearch-1.4.2 elasticsearch-hadoop-2.1.0.__Beta2.jar On Tue, Feb 10, 2015 at 10:05 PM, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com mailto:costin.l...@gmail.com mailto:costin.l...@gmail.com__ wrote: Sorry but there's too little information in this email to make any type of assesment. Can you please describe what you are trying to do, what version of Elastic and es-spark are you suing and potentially post a snippet of code? What does your RDD contain? On 2/10/15 6:05 PM, shahid wrote: INFO scheduler.TaskSetManager: Starting task 2.1 in stage 2.0 (TID 9, ip-10-80-98-118.ec2.internal, PROCESS_LOCAL, 1025 bytes) 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1] 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 2.0 (TID 10, ip-10-80-15-145.ec2.internal, PROCESS_LOCAL, 1025 bytes) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-when-trying-to-use-EShadoop-connector-and-writing-rdd-to-ES-tp21579.html http://1001560.n3.nabble.com/__Exception-when-trying-to-use-__EShadoop-connector-and-__writing-rdd-to-ES-tp21579.html http://apache-spark-user-__list.1001560.n3.nabble.com/__Exception-when-trying-to-use-__EShadoop-connector-and-__writing-rdd-to-ES-tp21579.html http://apache-spark-user-list.1001560.n3.nabble.com/Exception-when-trying-to-use-EShadoop-connector-and-writing-rdd-to-ES-tp21579.html__ Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Costin -- with Regards Shahid Ashraf -- Costin -- with Regards Shahid Ashraf -- Costin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FYI: Prof John Canny is giving a talk on Machine Learning at the limit in SF Big Analytics Meetup
Just in case you are in San Francisco, we are having a meetup by Prof John Canny http://www.meetup.com/SF-Big-Analytics/events/220427049/ Chester
Re: Similar code in Java
Please take a look at: examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java which was checked in yesterday. On Sat, Feb 7, 2015 at 10:53 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Ted, I’ve seen the codes, I am using JavaKafkaWordCount.java but I would like reproducing in java that I’ve done in scala. Is it possible doing the same thing that scala code does in java? Principally this code below or something looks liked: val KafkaDStreams = (1 to numStreams) map {_ = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,storageLevel = StorageLevel.MEMORY_ONLY).map(_._2) On Feb 7, 2015, at 19:32, Ted Yu yuzhih...@gmail.com wrote: Can you take a look at: ./examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java ./external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java Cheers On Sat, Feb 7, 2015 at 9:45 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, How could I doing in Java the code scala below? val KafkaDStreams = (1 to numStreams) map {_ = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,storageLevel = StorageLevel.MEMORY_ONLY).map(_._2) } val unifiedStream = ssc.union(KafkaDStreams) val sparkProcessingParallelism = 1 unifiedStream.repartition(sparkProcessingParallelism) Thanks Guys Informativa sulla Privacy: http://www.unibs.it/node/8155 Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: How to broadcast a variable read from a file in yarn-cluster mode?
They're separate in my code, how can I combine them? Here's what I have: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(bucketSecs)) val sc = new SparkContext() On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO
pyspark: Java null pointer exception when accessing broadcast variables
I'm trying to use a broadcasted dictionary inside a map function and am consistently getting Java null pointer exceptions. This is inside an IPython session connected to a standalone spark cluster. I seem to recall being able to do this before but at the moment I am at a loss as to what to try next. Is there a limit to the size of broadcast variables? This one is rather large (a few Gb dict). Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Java-null-pointer-exception-when-accessing-broadcast-variables-tp21580.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to broadcast a variable read from a file in yarn-cluster mode?
You should be able to replace that second line with val sc = ssc.sparkContext On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg jonrgr...@gmail.com wrote: They're separate in my code, how can I combine them? Here's what I have: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(bucketSecs)) val sc = new SparkContext() On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager:
Re: How to broadcast a variable read from a file in yarn-cluster mode?
OK that worked and getting close here ... the job ran successfully for a bit and I got output for the first couple buckets before getting a java.lang.Exception: Could not compute split, block input-0-1423593163000 not found error. So I bumped up the memory at the command line from 2 gb to 5 gb, ran it again ... this time I got around 8 successful outputs before erroring. Bumped up the memory from 5 gb to 10 gb ... got around 15 successful outputs before erroring. I'm not persisting or caching anything except for the broadcast IP table and another broadcast small user agents list used for the same type of filtering, and both files are tiny. The Hadoop cluster is nearly empty right now and has more than enough available memory to handle this job. I am connecting to Kafka as well and so there's a lot of data coming through as my index is trying to catch up to the current date, but yarn-client mode has several times in the past few weeks been able to catch up to the current date and run successfully for days without issue. My guess is memory isn't being cleared after each bucket? Relevant portion of the log below. 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage 114, Stage 115, Stage 116) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on phd40010023.na.com:1 in memory (size: 43.7 MB, free: 11.1 GB) 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 766 from persistence list 15/02/10 13:34:54 INFO BlockManager: Removing RDD 766 15/02/10 13:34:54 INFO MappedRDD:
Exception when trying to use EShadoop connector and writing rdd to ES
INFO scheduler.TaskSetManager: Starting task 2.1 in stage 2.0 (TID 9, ip-10-80-98-118.ec2.internal, PROCESS_LOCAL, 1025 bytes) 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1] 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Starting task 1.1 in stage 2.0 (TID 10, ip-10-80-15-145.ec2.internal, PROCESS_LOCAL, 1025 bytes) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-when-trying-to-use-EShadoop-connector-and-writing-rdd-to-ES-tp21579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why can't Spark find the classes in this Jar?
I am new to spark. I am trying to compile and run a spark application that requires classes from an (external) jar file on my local machine. If I open the jar (on ~/Desktop) I can see the missing class in the local jar but when I run spark I get NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier I add the jar to the spark context like this String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar}; SparkConf conf = new SparkConf().setAppName(Simple Application).setJars(jars); Then I try to run a submit script like this /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \ --class SimpleApp \ --master local[4] \ target/simple-project-1.0.jar \ --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar and hit the NoClassDefFoundError. I get that this means that the worker threads can't find the class from the jar. But I am not sure what I am doing wrong. I have tried different syntaxes for the last line (below) but none works. --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar How can I fix this error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Open file limit settings for Spark on Yarn job
Hi, I'm running Spark on Yarn from an edge node, and the tasks on the run Data Nodes. My job fails with the Too many open files error once it gets to groupByKey(). Alternatively I can make it fail immediately if I repartition the data when I create the RDD. Where do I need to make sure that ulimit -n is high enough? On the edge node it is small, 1024, but on the data nodes, the yarn user has a high limit, 32k. But is the yarn user the relevant user? And, is the 1024 limit for myself on the edge node a problem or is that limit not relevant? Arun
Re: Spark on very small files, appropriate use case?
Spark is an framework to do things in parallel very easy, it definitely will help your cases. def read_file(path): lines = open(path).readlines() # bzip2 return lines filesRDD = sc.parallelize(path_to_files, N) lines = filesRDD.flatMap(read_file) Then you could do other transforms on lines. On Tue, Feb 10, 2015 at 12:32 PM, soupacabana eiersalat...@gmail.com wrote: Hi all, I have the following use case: One job consists of reading from 500-2000 small bzipped logs that are on an nfs. (Small means, that the zipped logs are between 0-100KB, average file size is 20KB.) We read the log lines, do some transformations, and write them to one output file. When we do it in pure Python (running the Python script on one core): -the time for 500 bzipped log files (6.5MB altogether) is about 5 seconds. -the time for 2000 bzipped log files (25MB altogether) is about 20 seconds. Because there will be many such jobs, I was thinking of trying Spark for that purpose. My preliminary findings and my questions: *Even only counting the number of log lines with Spark is about 10 times slower than the entire transformation done by the Python script. *sc.textfile(list_of_filenames) appear to not perform well on small files, why? *sc.wholeTextFiles(path_to_files) performs better than sc.textfile, but does not support bzipped files. However, also wholeTextFiles does not nearly provide the speed of the Python script. *The initialization of a Spark Context takes about 4 seconds. Sending a Spark job to a cluster takes even longer. Is there a way to decrease this initialization phase? The JVM take about 4 seconds to start up, but a task take only 0.1 second to start. *Is my use case actually an appropriate use case for Spark? Many thanks for your help and comments! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-very-small-files-appropriate-use-case-tp21583.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark sql registerFunction with 1.2.1
Hi, I am trying a very simple registerFunction and it is giving me errors. I have a parquet file which I register as temp table. Then I define a UDF. def toSeconds(timestamp: Long): Long = timestamp/10 sqlContext.registerFunction(toSeconds, toSeconds _) val result = sqlContext.sql(select toSeconds(timestamp) from blah); I get the following error. java.lang.RuntimeException: [1.18] failure: ``)'' expected but `timestamp' found select toSeconds(timestamp) from blah My end goal is as follows: We have log file with timestamps in microseconds and I would like to group by entries with second level precision, so eventually I want to run the query select toSeconds(timestamp) as t, count(x) from table group by t,x
Re: Spark on very small files, appropriate use case?
I had a similar use case before. I found: 1. textFile() produced one partition per file. It can result in many partitions. I found that calling coalecse() without shuffle helped. 2. If you used persist(), count() will do I/O and put the result into cache. Transformation later did computation out of the memory cache which could be much faster. And, in general, small files hurt I/O performance. On Tue, Feb 10, 2015 at 12:52 PM, Davies Liu dav...@databricks.com wrote: Spark is an framework to do things in parallel very easy, it definitely will help your cases. def read_file(path): lines = open(path).readlines() # bzip2 return lines filesRDD = sc.parallelize(path_to_files, N) lines = filesRDD.flatMap(read_file) Then you could do other transforms on lines. On Tue, Feb 10, 2015 at 12:32 PM, soupacabana eiersalat...@gmail.com wrote: Hi all, I have the following use case: One job consists of reading from 500-2000 small bzipped logs that are on an nfs. (Small means, that the zipped logs are between 0-100KB, average file size is 20KB.) We read the log lines, do some transformations, and write them to one output file. When we do it in pure Python (running the Python script on one core): -the time for 500 bzipped log files (6.5MB altogether) is about 5 seconds. -the time for 2000 bzipped log files (25MB altogether) is about 20 seconds. Because there will be many such jobs, I was thinking of trying Spark for that purpose. My preliminary findings and my questions: *Even only counting the number of log lines with Spark is about 10 times slower than the entire transformation done by the Python script. *sc.textfile(list_of_filenames) appear to not perform well on small files, why? *sc.wholeTextFiles(path_to_files) performs better than sc.textfile, but does not support bzipped files. However, also wholeTextFiles does not nearly provide the speed of the Python script. *The initialization of a Spark Context takes about 4 seconds. Sending a Spark job to a cluster takes even longer. Is there a way to decrease this initialization phase? The JVM take about 4 seconds to start up, but a task take only 0.1 second to start. *Is my use case actually an appropriate use case for Spark? Many thanks for your help and comments! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-very-small-files-appropriate-use-case-tp21583.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSQL + Tableau Connector
Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Spark on very small files, appropriate use case?
Hi all, I have the following use case: One job consists of reading from 500-2000 small bzipped logs that are on an nfs. (Small means, that the zipped logs are between 0-100KB, average file size is 20KB.) We read the log lines, do some transformations, and write them to one output file. When we do it in pure Python (running the Python script on one core): -the time for 500 bzipped log files (6.5MB altogether) is about 5 seconds. -the time for 2000 bzipped log files (25MB altogether) is about 20 seconds. Because there will be many such jobs, I was thinking of trying Spark for that purpose. My preliminary findings and my questions: *Even only counting the number of log lines with Spark is about 10 times slower than the entire transformation done by the Python script. *sc.textfile(list_of_filenames) appear to not perform well on small files, why? *sc.wholeTextFiles(path_to_files) performs better than sc.textfile, but does not support bzipped files. However, also wholeTextFiles does not nearly provide the speed of the Python script. *The initialization of a Spark Context takes about 4 seconds. Sending a Spark job to a cluster takes even longer. Is there a way to decrease this initialization phase? *Is my use case actually an appropriate use case for Spark? Many thanks for your help and comments! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-very-small-files-appropriate-use-case-tp21583.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark job server be used to visualize streaming data?
Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
spark python exception
sometimes I'm getting this exception: Traceback (most recent call last): File /opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py, line 162, in manager code = worker(sock) File /opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/daemon.py, line 64, in worker outfile.flush() IOError: [Errno 32] Broken pipe Is it a spark bug?
Re: hadoopConfiguration for StreamingContext
Looks like the latest version 1.2.1 actually does use the configured hadoop conf. I tested it out and that does resolve my problem. thanks, marc On Tue, Feb 10, 2015 at 10:57 AM, Marc Limotte mslimo...@gmail.com wrote: Thanks, Akhil. I had high hopes for #2, but tried all and no luck. I was looking at the source and found something interesting. The Stack Trace (below) directs me to FileInputDStream.scala (line 141). This is version 1.1.1, btw. Line 141 has: private def fs: FileSystem = { if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration()) fs_ } So it looks to me like it doesn't make any attempt to use a configured HadoopConf. Here is the StackTrace: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy5.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.dstream.FileInputDStream.org $apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:141) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) ... On Tue, Feb 10, 2015 at 10:28 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try the following: 1. Set the access key and secret key in the sparkContext: ssc.sparkContext.hadoopConfiguration.set(AWS_ACCESS_KEY_ID,yourAccessKey) ssc.sparkContext.hadoopConfiguration.set(AWS_SECRET_ACCESS_KEY,yourSecretKey) 2. Set the access key and secret key in the environment before starting your application: export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret 3. Set the access key and secret key inside the hadoop configurations val hadoopConf=ssc.sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 4. You can also try: val stream = ssc.textFileStream(s3n://yourAccessKey:yourSecretKey@ yourBucket/path/) Thanks Best Regards On Tue, Feb 10, 2015 at 8:27 PM, Marc Limotte mslimo...@gmail.com wrote: I see that StreamingContext has a hadoopConfiguration() method, which can be used like this sample I found: sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); But StreamingContext doesn't have the same thing. I want to use a StreamingContext with s3n: text file input, but can't find a way to set the AWS credentials. I also tried (with no success): - adding the properties to conf/spark-defaults.conf - $HADOOP_HOME/conf/hdfs-site.xml - ENV variables - Embedded as user:password in s3n://user:password@... (w/ url encoding) - Setting the conf as above on a new SparkContext and passing that the StreamingContext constructor: StreamingContext(sparkContext: SparkContext, batchDuration: Duration) Can someone point me in the right direction for setting AWS creds (hadoop conf options) for streamingcontext? thanks, Marc Limotte Climate Corporation
Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have made a sample app below to mimic the problem and put all classes into one file. To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name) .set(SparkConfig.SparkMesosCoarse, Constants.True) .set(SparkConfig.SparkCleanerTtl, 300) .set(SparkConfig.SparkDriverMemory, 128m)
Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have pasted in a sample app below to mimic the problem and put all classes into one file, it is also attached here SampleJob.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) // Class not found bucket case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) // used for updateStateByKey case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket(DAY, MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name)
Error while querying hive table from spark shell
Hi , I am getting the following error when I am trying query a hive table from spark shell. I have placed my hive-site.xml in the spark/conf directory. Please suggest how to resolve this error. scala sqlContext.sql(select count(*) from offers_new).collect().foreach(println) 15/02/11 01:48:01 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/02/11 01:48:01 INFO parse.ParseDriver: Parsing command: select count(*) from offers_new 15/02/11 01:48:01 INFO parse.ParseDriver: Parse Completed 15/02/11 01:48:01 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/11 01:48:01 INFO metastore.ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table offers_new at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:984) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at
Re: Beginner in Spark
You may also go through these posts https://docs.sigmoidanalytics.com/index.php/Spark_Installation Thanks Best Regards On Fri, Feb 6, 2015 at 9:39 PM, King sami kgsam...@gmail.com wrote: Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Re: org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
See this answer by Josh http://stackoverflow.com/questions/26692658/cant-connect-from-application-to-the-standalone-cluster You may also find this post useful http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3c7a889b1c-aa14-4cf2-8375-37f9cf827...@gmail.com%3E Thanks Best Regards On Wed, Feb 11, 2015 at 10:11 AM, lakewood pxy0...@gmail.com wrote: Hi, I'm new to Spark. I have built small spark on yarn cluster, which contains 1 master(20GB RAM, 8 core), 3 worker(4GB RAM, 4 core). When trying to run a command sc.parallelize(1 to 1000).count() through $SPARK_HOME/bin/spark-shell, sometimes the command can submit a job successfully, sometimes it is failure with following exception. I can definitely make sure the three workers are registered to master after checking out spark webui. There are spark memory-related parameters to be configured in spark-env.sh file, for instance, SPARK_EXECUTOR_MEMORY=2G, SPARK_DRIVER_MEMORY=1G, SPARK_WORKER_MEMORY=4G. Would anyone help to give me hint how to resolve this issue? I have not give any hint after google search. *# bin/spark-shellSpark assembly has been built with Hive, including Datanucleus jars on classpath15/02/11 12:21:39 INFO SecurityManager: Changing view acls to: root,15/02/11 12:21:39 INFO SecurityManager: Changing modify acls to: root,15/02/11 12:21:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, )15/02/11 12:21:39 INFO HttpServer: Starting HTTP Server15/02/11 12:21:39 INFO Utils: Successfully started service 'HTTP class server' on port 28968.Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.6.0_24)Type in expressions to have them evaluated.Type :help for more information.15/02/11 12:21:43 INFO SecurityManager: Changing view acls to: root,15/02/11 12:21:43 INFO SecurityManager: Changing modify acls to: root,15/02/11 12:21:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, )15/02/11 12:21:44 INFO Slf4jLogger: Slf4jLogger started15/02/11 12:21:44 INFO Remoting: Starting remoting15/02/11 12:21:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xpan-biqa1:6862]15/02/11 12:21:44 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@xpan-biqa1:6862]15/02/11 12:21:44 INFO Utils: Successfully started service 'sparkDriver' on port 6862.15/02/11 12:21:44 INFO SparkEnv: Registering MapOutputTracker15/02/11 12:21:44 INFO SparkEnv: Registering BlockManagerMaster15/02/11 12:21:44 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150211122144-ed2615/02/11 12:21:44 INFO Utils: Successfully started service 'Connection manager for block manager' on port 40502.15/02/11 12:21:44 INFO ConnectionManager: Bound socket to port 40502 with id = ConnectionManagerId(xpan-biqa1,40502)15/02/11 12:21:44 INFO MemoryStore: MemoryStore started with capacity 265.0 MB15/02/11 12:21:44 INFO BlockManagerMaster: Trying to register BlockManager15/02/11 12:21:44 INFO BlockManagerMasterActor: Registering block manager xpan-biqa1:40502 with 265.0 MB RAM15/02/11 12:21:44 INFO BlockManagerMaster: Registered BlockManager15/02/11 12:21:44 INFO HttpFileServer: HTTP File server directory is /tmp/spark-0a80ce6b-6a05-4163-a97d-07753f627ec815/02/11 12:21:44 INFO HttpServer: Starting HTTP Server15/02/11 12:21:44 INFO Utils: Successfully started service 'HTTP file server' on port 25939.15/02/11 12:21:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.15/02/11 12:21:44 INFO SparkUI: Started SparkUI at http://xpan-biqa1:4040 http://xpan-biqa1:404015/02/11 12:21:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/02/11 12:21:46 INFO EventLoggingListener: Logging events to hdfs://xpan-biqa1:7020/spark/spark-shell-142362850543115/02/11 12:21:46 INFO AppClient$ClientActor: Connecting to master spark://xpan-biqa1:7077...15/02/11 12:21:46 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.015/02/11 12:21:46 INFO SparkILoop: Created spark context..Spark context available as sc.scala 15/02/11 12:22:06 INFO AppClient$ClientActor: Connecting to master spark://xpan-biqa1:7077...scala sc.parallelize(1 to 1000).count()15/02/11 12:22:24 INFO SparkContext: Starting job: count at console:1315/02/11 12:22:24 INFO DAGScheduler: Got job 0 (count at console:13) with 2 output partitions (allowLocal=false)15/02/11 12:22:24 INFO
Naive Bayes model fails after a few predictions
Hi, I have built a Sentiment Analyzer using the Naive Bayes model, the model works fine by learning from a list of 200 movie reviews and correctly predicting with an accuracy of close to 77% to 80%. After a while of predicting I get the following stacktrace... By the way...I have only one SparkContext and I can reproduce this everytime. 15/02/11 11:01:31 INFO NBSentimentAnalyser2$: predictSentiment -- [reviewID:ceb43d14-9052-4178-927c-53998898befe review text len:3899] 15/02/11 11:01:31 ERROR Executor: Exception in task 1.0 in stage 1844.0 (TID 7273) java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:110) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000) ... 23 more 15/02/11 11:01:31 ERROR Executor: Exception in task 0.0 in stage 1844.0 (TID 7272) java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:110) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
R: Datastore HDFS vs Cassandra
Hi Mike, I developed a Solution with cassandra and spark, using DSE. The main difficult is about cassandra, you need to understand very well its data model and its Query patterns. Cassandra has better performance than hdfs and it has DR and stronger availability. Hdfs is a filesystem, cassandra is a dbms. Cassandra supports full CRUD without acid. Hdfs is more flexible than cassandra. In my opinion, if you have a real time series, go with Cassandra paying attention at your reporting data access patterns. Paolo Inviata dal mio Windows Phone Da: Mike Trienismailto:mike.trie...@orcsol.com Inviato: 11/02/2015 05:59 A: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Datastore HDFS vs Cassandra Hi, I am considering implement Apache Spark on top of Cassandra database after listing to related talk and reading through the slides from DataStax. It seems to fit well with our time-series data and reporting requirements. http://www.slideshare.net/patrickmcfadin/apache-cassandra-apache-spark-for-time-series-data Does anyone have any experiences using Apache Spark and Cassandra, including limitations (and or) technical difficulties? How does Cassandra compare with HDFS and what use cases would make HDFS more suitable? Thanks, Mike. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Datastore-HDFS-vs-Cassandra-tp21590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Beginner in Spark
Refer this blog http://blog.prabeeshk.com/blog/2014/10/31/install-apache-spark-on-ubuntu-14-dot-04/ for step by step installation of Spark on Ubuntu On 7 February 2015 at 03:12, Matei Zaharia matei.zaha...@gmail.com wrote: You don't need HDFS or virtual machines to run Spark. You can just download it, unzip it and run it on your laptop. See http://spark.apache.org/docs/latest/index.html. Matei On Feb 6, 2015, at 2:58 PM, David Fallside falls...@us.ibm.com wrote: King, consider trying the Spark Kernel ( https://github.com/ibm-et/spark-kernel) which will install Spark etc and provide you with a Spark/Scala Notebook in which you can develop your algorithm. The Vagrant installation described in https://github.com/ibm-et/spark-kernel/wiki/Vagrant-Development-Environment will have you quickly up and running on a single machine without having to manage the details of the system installations. There is a Docker version, https://github.com/ibm-et/spark-kernel/wiki/Using-the-Docker-Container-for-the-Spark-Kernel, if you prefer Docker. Regards, David King sami kgsam...@gmail.com wrote on 02/06/2015 08:09:39 AM: From: King sami kgsam...@gmail.com To: user@spark.apache.org Date: 02/06/2015 08:11 AM Subject: Beginner in Spark Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark
I think we made the binary protocol compatible across all versions, so you should be fine with using any one of them. 1.2.1 is probably the best since it is the most recent stable release. On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I need to use branch-1.2 and sometimes master builds of Spark for my project. However the officially supported Spark version by our Hadoop admin is only 1.2.0. So, my question is which version/build of spark-yarn-shuffle.jar should I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: spark, reading from s3
Its with the timezone actually, you can either use an NTP to maintain accurate system clock or you can adjust your system time to match with the AWS one. You can do it as: telnet s3.amazonaws.com 80 GET / HTTP/1.0 [image: Inline image 1] Thanks Best Regards On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote: I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity?
Re: SparkSQL + Tableau Connector
I am a little confused here, why do you want to create the tables in hive. You want to create the tables in spark-sql, right? If you are not able to find the same tables through tableau then thrift is connecting to a diffrent metastore than your spark-shell. One way to specify a metstore to thrift is to provide the path to hive-site.xml while starting thrift using --files hive-site.xml. similarly you can specify the same metastore to your spark-submit or sharp-shell using the same option. On Wed, Feb 11, 2015 at 5:23 AM, Todd Nist tsind...@gmail.com wrote: Arush, As for #2 do you mean something like this from the docs: // sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))sqlContext.sql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) // Queries are expressed in HiveQLsqlContext.sql(FROM src SELECT key, value).collect().foreach(println) Or did you have something else in mind? -Todd On Tue, Feb 10, 2015 at 6:35 PM, Todd Nist tsind...@gmail.com wrote: Arush, Thank you will take a look at that approach in the morning. I sort of figured the answer to #1 was NO and that I would need to do 2 and 3 thanks for clarifying it for me. -Todd On Tue, Feb 10, 2015 at 5:24 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? NO 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? Create a table in spark sql to expose via spark sql 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2 you would need to configure thrift to read from the metastore you expect it read from - by default it reads from metastore_db directory present in the directory used to launch the thrift server. On 11 Feb 2015 01:35, Todd Nist tsind...@gmail.com wrote: Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Error while querying hive table from spark shell
Seems that the HDFS path for the table dosnt contains any file/data. Does the metastore contain the right path for HDFS data. You can find the HDFS path in TBLS in your metastore. On Wed, Feb 11, 2015 at 12:20 PM, kundan kumar iitr.kun...@gmail.com wrote: Hi , I am getting the following error when I am trying query a hive table from spark shell. I have placed my hive-site.xml in the spark/conf directory. Please suggest how to resolve this error. scala sqlContext.sql(select count(*) from offers_new).collect().foreach(println) 15/02/11 01:48:01 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/02/11 01:48:01 INFO parse.ParseDriver: Parsing command: select count(*) from offers_new 15/02/11 01:48:01 INFO parse.ParseDriver: Parse Completed 15/02/11 01:48:01 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/11 01:48:01 INFO metastore.ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table offers_new at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:984) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at
Re: SparkSQL + Tableau Connector
BTW what tableau connector are you using? On Wed, Feb 11, 2015 at 12:55 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: I am a little confused here, why do you want to create the tables in hive. You want to create the tables in spark-sql, right? If you are not able to find the same tables through tableau then thrift is connecting to a diffrent metastore than your spark-shell. One way to specify a metstore to thrift is to provide the path to hive-site.xml while starting thrift using --files hive-site.xml. similarly you can specify the same metastore to your spark-submit or sharp-shell using the same option. On Wed, Feb 11, 2015 at 5:23 AM, Todd Nist tsind...@gmail.com wrote: Arush, As for #2 do you mean something like this from the docs: // sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))sqlContext.sql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) // Queries are expressed in HiveQLsqlContext.sql(FROM src SELECT key, value).collect().foreach(println) Or did you have something else in mind? -Todd On Tue, Feb 10, 2015 at 6:35 PM, Todd Nist tsind...@gmail.com wrote: Arush, Thank you will take a look at that approach in the morning. I sort of figured the answer to #1 was NO and that I would need to do 2 and 3 thanks for clarifying it for me. -Todd On Tue, Feb 10, 2015 at 5:24 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? NO 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? Create a table in spark sql to expose via spark sql 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2 you would need to configure thrift to read from the metastore you expect it read from - by default it reads from metastore_db directory present in the directory used to launch the thrift server. On 11 Feb 2015 01:35, Todd Nist tsind...@gmail.com wrote: Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Which version to use for shuffle service if I'm going to run multiple versions of Spark
Hi, I need to use branch-1.2 and sometimes master builds of Spark for my project. However the officially supported Spark version by our Hadoop admin is only 1.2.0. So, my question is which version/build of spark-yarn-shuffle.jar should I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: can we insert and update with spark sql
You should look at https://github.com/amplab/spark-indexedrdd On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Michael, I want to cache a RDD and define get() and set() operators on it. Basically like memcached. Is it possible to build a memcached like distributed cache using Spark SQL ? If not what do you suggest we should use for such operations... Thanks. Deb On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust mich...@databricks.com wrote: You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Re: Will Spark serialize an entire Object or just the method referred in an object?
Hi Yitong, It's not as simple as that. In your very simple example, the only things referenced by the closure are (i) the input arguments and (ii) a Scala object. So there are no external references to serialize in that case, just the closure instance itself - see, there is still something being serialized, you just don't see it. What happens to the output of a map will depend on what other transformations or actions you perform on the RDD it returns. So saying only its output will be serialized is a question that cannot be answered by just looking at that code. I really suggest that if you're curious, you study the disassembled bytecode, which is really not hard to understand. There are also plenty of previous messages on this list that have covered this topic. On Mon, Feb 9, 2015 at 7:56 PM, Yitong Zhou timyit...@gmail.com wrote: Hi Marcelo, Thanks for the explanation! So you mean in this way, actually only the output of the map closure would need to be serialized so that it could be passed further for other operations (maybe reduce or else)? And we don't have to worry about Utils.funcX because for each closure instance we would load a new instance containing the func1 and func2 from jars that are already cached into local nodes? Thanks, Yitong 2015-02-09 14:35 GMT-08:00 Marcelo Vanzin van...@cloudera.com: `func1` and `func2` never get serialized. They must exist on the other end in the form of a class loaded by the JVM. What gets serialized is an instance of a particular closure (the argument to your map function). That's a separate class. The instance of that class that is serialized contains references to all other instances it needs to execute its apply method (or run or whatever is the correct method name). In this case, nothing is needed, since all it does is pass its argument in a call to a static method (Util.func1). Hope that helps, these things can be really confusing. You can play with javap -c to disassemble the class files to understand better how it all happens under the hood. On Mon, Feb 9, 2015 at 1:56 PM, Yitong Zhou timyit...@gmail.com wrote: If we define an Utils object: object Utils { def func1 = {..} def func2 = {..} } And then in a RDD we refer to one of the function: rdd.map{r = Utils.func1(r)} Will Utils.func2 also get serialized or not? Thanks, Yitong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Will-Spark-serialize-an-entire-Object-or-just-the-method-referred-in-an-object-tp21566.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: can we insert and update with spark sql
Hi Michael, I want to cache a RDD and define get() and set() operators on it. Basically like memcached. Is it possible to build a memcached like distributed cache using Spark SQL ? If not what do you suggest we should use for such operations... Thanks. Deb On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust mich...@databricks.com wrote: You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Re: Beginner in Spark
2015-02-06 17:28 GMT+00:00 King sami kgsam...@gmail.com: The purpose is to build a data processing system for door events. An event will describe a door unlocking with a badge system. This event will differentiate unlocking by somebody from the inside and by somebody from the outside. *Producing the events*: You will need a simulator capable of producing events at random intervals. Simulating 200 doors seems like a good number, but adapt it as you see fit to get relevant results. Make sure different doors have different patterns to make the analysis interesting. *Processing the events:* After having accumulated a certain amount of events (for example: a day), you will calculate statistics. To do this, you will use spark for your batch processing. You will extract: • most used door, less used door, door with most exits, door with most entrances • most and less busy moment (when people entered and exited a lot, or not at all) • less busy moment of the day *Hints:* • Spark is required: http://spark.apache.org • Coding in Scala is required. • Using HDFS for file storage is a plus. 2015-02-06 17:00 GMT+00:00 Nagesh sarvepalli sarvepalli.nag...@gmail.com : Hi, Here is the sequence I suggest. Feel free if you need further help. 1) You need to decide if you want to go with any particular distribution of Hadoop (Cloudera / Hortonworks / MapR) or want to go for apache version . Downloading Hadoop from Apache and integrating with various projects is laborious (compared to distributions). Also, you need to take care of maintenance including version compatibility of various projects. Cloudera Manager is the best when it comes to cluster installation and maintenance but it is memory intensive. Cloud offerings (ex: from Microsoft) are even much more simpler and hassle free when it comes to installation and maintenance. 2) Depending on the server resources and the data size, you need to decide on the HDFS cluster size (number of nodes). Ensure you have the right JDK version installed if you are installing Hadoop on your own. 3) Once Hadoop is installed, you need to download Scala from scala-lang.org and then 4) Download and install spark from http://spark.apache.org/downloads.html Hope this helps to kick-start. Thanks Regards Nagesh Cloudera Certified Hadoop Developer On Fri, Feb 6, 2015 at 4:09 PM, King sami kgsam...@gmail.com wrote: Hi, I'm new in Spark, I'd like to install Spark with Scala. The aim is to build a data processing system foor door events. the first step is install spark, scala, hdfs and other required tools. the second is build the algorithm programm in Scala which can treat a file of my data logs (events). Could you please help me to install the required tools: Spark, Scala, HDF and tell me how can I execute my programm treating the entry file. Best regards,
Re: pyspark: Java null pointer exception when accessing broadcast variables
I get this in the driver log: java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) and on one of the executor's stderr: 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py, line 57, in main split_index = read_int(infile) File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py, line 511, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Caused by: java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) ... 4 more 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py, line 57, in main split_index = read_int(infile) File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py, line 511, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Caused by: java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at
Re: ZeroMQ and pyspark.streaming
No, zeromq api is not supported in python as of now. On 5 Feb 2015 21:27, Sasha Kacanski skacan...@gmail.com wrote: Does pyspark supports zeroMQ? I see that java does it, but I am not sure for Python? regards -- Aleksandar Kacanski
Re: SparkSQL + Tableau Connector
1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? NO 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? Create a table in spark sql to expose via spark sql 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2 you would need to configure thrift to read from the metastore you expect it read from - by default it reads from metastore_db directory present in the directory used to launch the thrift server. On 11 Feb 2015 01:35, Todd Nist tsind...@gmail.com wrote: Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Re: spark sql registerFunction with 1.2.1
The simple SQL parser doesn't yet support UDFs. Try using a HiveContext. On Tue, Feb 10, 2015 at 1:44 PM, Mohnish Kodnani mohnish.kodn...@gmail.com wrote: Hi, I am trying a very simple registerFunction and it is giving me errors. I have a parquet file which I register as temp table. Then I define a UDF. def toSeconds(timestamp: Long): Long = timestamp/10 sqlContext.registerFunction(toSeconds, toSeconds _) val result = sqlContext.sql(select toSeconds(timestamp) from blah); I get the following error. java.lang.RuntimeException: [1.18] failure: ``)'' expected but `timestamp' found select toSeconds(timestamp) from blah My end goal is as follows: We have log file with timestamps in microseconds and I would like to group by entries with second level precision, so eventually I want to run the query select toSeconds(timestamp) as t, count(x) from table group by t,x
Re: SparkSQL + Tableau Connector
Hi Todd, What you could do is run some SparkSQL commands immediately after the Thrift server starts up. Or does Tableau have some init SQL commands you could run? You can actually load data using SQL, such as: create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/people.json’) cache table people create temporary table users using org.apache.spark.sql.parquet options (path 'examples/src/main/resources/users.parquet’) cache table users From: Todd Nist Date: Tuesday, February 10, 2015 at 3:03 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL + Tableau Connector Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import org.apache.sql.SQLContext import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Spark Summit East - March 18-19 - NYC
The inaugural Spark Summit East, an event to bring the Apache Spark community together, will be in New York City on March 18, 2015. We are excited about the growth of Spark and to bring the event to the east coast. At Spark Summit East you can look forward to hearing from Matei Zaharia, Databricks CEO Ion Stoica, representatives from Palantir, Goldman Sachs, Baidu, Salesforce, Cloudera, Box, and many others. (See the full agenda at http://spark-summit.org/east/2015) All of these companies are utilizing Spark. Come see what their experience has been and get a chance to talk with some of the creators and committers. If you are new to Spark or looking to improve on your knowledge of the technology, there will be three levels of Spark Training: Intro to Spark, Advanced Spark Training, and Data Science with Spark. Space is limited, but we want to make sure those active in the community are aware of the this new event in NYC. Use promo code DevList15 for 15% off your registration fee when registering before March 1, 2015. Register at http://spark-summit.org/east/2015/register Looking forward to seeing you there! Best, Scott The Spark Summit Organizers
hadoopConfiguration for StreamingContext
I see that StreamingContext has a hadoopConfiguration() method, which can be used like this sample I found: sc.hadoopConfiguration().set(fs.s3.awsAccessKeyId, XX); sc.hadoopConfiguration().set(fs.s3.awsSecretAccessKey, XX); But StreamingContext doesn't have the same thing. I want to use a StreamingContext with s3n: text file input, but can't find a way to set the AWS credentials. I also tried (with no success): - adding the properties to conf/spark-defaults.conf - $HADOOP_HOME/conf/hdfs-site.xml - ENV variables - Embedded as user:password in s3n://user:password@... (w/ url encoding) - Setting the conf as above on a new SparkContext and passing that the StreamingContext constructor: StreamingContext(sparkContext: SparkContext, batchDuration: Duration) Can someone point me in the right direction for setting AWS creds (hadoop conf options) for streamingcontext? thanks, Marc Limotte Climate Corporation
Re: SparkSQL + Tableau Connector
Todd, I just tried it in bin/spark-sql shell. I created a folder json and just put 2 copies of the same people.json file This is what I ran: spark-sql create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/json/*') ; Time taken: 0.34 seconds spark-sql select * from people; NULLMichael 30 Andy 19 Justin NULLMichael 30 Andy 19 Justin Time taken: 0.576 seconds From: Todd Nist Date: Tuesday, February 10, 2015 at 6:49 PM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL + Tableau Connector Hi Silvio, Ah, I like that, there is a section in Tableau for Initial SQL to be executed upon connecting this would fit well there. I guess I will need to issue a collect(), coalesce(1,true).saveAsTextFile(...) or use repartition(1), as the file currently is being broken into multiple parts. While this works in the spark-shell: val test = sqlContext.jsonFile(/data/out/“) // returs all parts back as one It seems to fail in just spark-sql: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test with: [Simba][SparkODBC] (35) Error from Spark: error code: '0' error message: 'org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test'. Initial SQL Error. Check that the syntax is correct and that you have access privileges to the requested database. Thanks again for the suggestion and I will give work with it a bit more tomorrow. -Todd On Tue, Feb 10, 2015 at 5:48 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: Hi Todd, What you could do is run some SparkSQL commands immediately after the Thrift server starts up. Or does Tableau have some init SQL commands you could run? You can actually load data using SQL, such as: create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/people.json’) cache table people create temporary table users using org.apache.spark.sql.parquet options (path 'examples/src/main/resources/users.parquet’) cache table users From: Todd Nist Date: Tuesday, February 10, 2015 at 3:03 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL + Tableau Connector Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import org.apache.sql.SQLContext import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Re: pyspark: Java null pointer exception when accessing broadcast variables
It's brave to broadcast 8G pickled data, it will take more than 15G in memory for each Python worker, how much memory do you have in executor and driver? Do you see any other exceptions in driver and executors? Something related to serialization in JVM. On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar rokros...@gmail.com wrote: I get this in the driver log: I think this should happen on executor, or you called first() or take() on the RDD? java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) and on one of the executor's stderr: 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py, line 57, in main split_index = read_int(infile) File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py, line 511, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Caused by: java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) ... 4 more 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py, line 57, in main split_index = read_int(infile) File /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py, line 511, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at
Re: SparkSQL + Tableau Connector
Arush, As for #2 do you mean something like this from the docs: // sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))sqlContext.sql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) // Queries are expressed in HiveQLsqlContext.sql(FROM src SELECT key, value).collect().foreach(println) Or did you have something else in mind? -Todd On Tue, Feb 10, 2015 at 6:35 PM, Todd Nist tsind...@gmail.com wrote: Arush, Thank you will take a look at that approach in the morning. I sort of figured the answer to #1 was NO and that I would need to do 2 and 3 thanks for clarifying it for me. -Todd On Tue, Feb 10, 2015 at 5:24 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? NO 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? Create a table in spark sql to expose via spark sql 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2 you would need to configure thrift to read from the metastore you expect it read from - by default it reads from metastore_db directory present in the directory used to launch the thrift server. On 11 Feb 2015 01:35, Todd Nist tsind...@gmail.com wrote: Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Spark installation
Hi, I'm new in Spark. I want to install it on my local machine (Ubunti 12.04) Could you help me please to install step by step Spark on may machine and run some Scala programms. Thanks
Re: can we insert and update with spark sql
Thanks...this is what I was looking for... It will be great if Ankur can give brief details about it...Basically how does it contrast with memcached for example... On Tue, Feb 10, 2015 at 2:32 PM, Michael Armbrust mich...@databricks.com wrote: You should look at https://github.com/amplab/spark-indexedrdd On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Michael, I want to cache a RDD and define get() and set() operators on it. Basically like memcached. Is it possible to build a memcached like distributed cache using Spark SQL ? If not what do you suggest we should use for such operations... Thanks. Deb On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust mich...@databricks.com wrote: You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Re: SparkSQL + Tableau Connector
Arush, Thank you will take a look at that approach in the morning. I sort of figured the answer to #1 was NO and that I would need to do 2 and 3 thanks for clarifying it for me. -Todd On Tue, Feb 10, 2015 at 5:24 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? NO 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? Create a table in spark sql to expose via spark sql 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2 you would need to configure thrift to read from the metastore you expect it read from - by default it reads from metastore_db directory present in the directory used to launch the thrift server. On 11 Feb 2015 01:35, Todd Nist tsind...@gmail.com wrote: Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Re: can we insert and update with spark sql
Also I wanted to run get() and set() from mapPartitions (from spark workers and not master)... To be able to do that I think I have to create a separate spark context for the cache... But I am not sure how SparkContext from job1 can access SparkContext from job2 ! On Tue, Feb 10, 2015 at 3:25 PM, Debasish Das debasish.da...@gmail.com wrote: Thanks...this is what I was looking for... It will be great if Ankur can give brief details about it...Basically how does it contrast with memcached for example... On Tue, Feb 10, 2015 at 2:32 PM, Michael Armbrust mich...@databricks.com wrote: You should look at https://github.com/amplab/spark-indexedrdd On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Michael, I want to cache a RDD and define get() and set() operators on it. Basically like memcached. Is it possible to build a memcached like distributed cache using Spark SQL ? If not what do you suggest we should use for such operations... Thanks. Deb On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust mich...@databricks.com wrote: You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Re: Spark installation
For local machine, I dont think there is any to install.. Just unzip and go to $SPARK_DIR/bin/spark-shell and that will open up a repl... On Tue, Feb 10, 2015 at 3:25 PM, King sami kgsam...@gmail.com wrote: Hi, I'm new in Spark. I want to install it on my local machine (Ubunti 12.04) Could you help me please to install step by step Spark on may machine and run some Scala programms. Thanks -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates
Re: SparkSQL + Tableau Connector
Hi Silvio, Ah, I like that, there is a section in Tableau for Initial SQL to be executed upon connecting this would fit well there. I guess I will need to issue a collect(), coalesce(1,true).saveAsTextFile(...) or use repartition(1), as the file currently is being broken into multiple parts. While this works in the spark-shell: val test = sqlContext.jsonFile(/data/out/“) // returs all parts back as one It seems to fail in just spark-sql: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test with: [Simba][SparkODBC] (35) Error from Spark: error code: '0' error message: 'org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: create temporary table test using org.apache.spark.sql.json options (path '/data/out/') cache table test'. Initial SQL Error. Check that the syntax is correct and that you have access privileges to the requested database. Thanks again for the suggestion and I will give work with it a bit more tomorrow. -Todd On Tue, Feb 10, 2015 at 5:48 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Todd, What you could do is run some SparkSQL commands immediately after the Thrift server starts up. Or does Tableau have some init SQL commands you could run? You can actually load data using SQL, such as: create temporary table people using org.apache.spark.sql.json options (path 'examples/src/main/resources/people.json’) cache table people create temporary table users using org.apache.spark.sql.parquet options (path 'examples/src/main/resources/users.parquet’) cache table users From: Todd Nist Date: Tuesday, February 10, 2015 at 3:03 PM To: user@spark.apache.org Subject: SparkSQL + Tableau Connector Hi, I'm trying to understand how and what the Tableau connector to SparkSQL is able to access. My understanding is it needs to connect to the thriftserver and I am not sure how or if it exposes parquet, json, schemaRDDs, or does it only expose schemas defined in the metastore / hive. For example, I do the following from the spark-shell which generates a schemaRDD from a csv file and saves it as a JSON file as well as a parquet file. import *org.apache.sql.SQLContext *import com.databricks.spark.csv._ val sqlContext = new SQLContext(sc) val test = sqlContext.csfFile(/data/test.csv)test.toJSON().saveAsTextFile(/data/out) test.saveAsParquetFile(/data/out) When I connect from Tableau, the only thing I see is the default schema and nothing in the tables section. So my questions are: 1. Can the connector fetch or query schemaRDD's saved to Parquet or JSON files? 2. Do I need to do something to expose these via hive / metastore other than creating a table in hive? 3. Does the thriftserver need to be configured to expose these in some fashion, sort of related to question 2. TIA for the assistance. -Todd
Re: Open file limit settings for Spark on Yarn job
Hi Arun, The limit for the YARN user on the cluster nodes should be all that matters. What version of Spark are you using? If you can turn on sort-based shuffle it should solve this problem. -Sandy On Tue, Feb 10, 2015 at 1:16 PM, Arun Luthra arun.lut...@gmail.com wrote: Hi, I'm running Spark on Yarn from an edge node, and the tasks on the run Data Nodes. My job fails with the Too many open files error once it gets to groupByKey(). Alternatively I can make it fail immediately if I repartition the data when I create the RDD. Where do I need to make sure that ulimit -n is high enough? On the edge node it is small, 1024, but on the data nodes, the yarn user has a high limit, 32k. But is the yarn user the relevant user? And, is the 1024 limit for myself on the edge node a problem or is that limit not relevant? Arun
Re: Writable serialization from InputFormat losing fields
I am able to get around the problem by doing a map and getting the Event out of the EventWritable before I do my collect. I think I'll do that for now. On Tue, Feb 10, 2015 at 6:04 PM, Corey Nolet cjno...@gmail.com wrote: I am using an input format to load data from Accumulo [1] in to a Spark RDD. It looks like something is happening in the serialization of my output writable between the time it is emitted from the InputFormat and the time it reaches its destination on the driver. What's happening is that the resulting Event object [2] inside the EventWritable [3] appears to have lost its Tuples [4] [1] https://github.com/calrissian/accumulo-recipes/blob/master/store/event-store/src/main/java/org/calrissian/accumulorecipes/eventstore/hadoop/EventInputFormat.java [2] https://github.com/calrissian/mango/blob/master/mango-core/src/main/java/org/calrissian/mango/domain/event/Event.java [3] https://github.com/calrissian/accumulo-recipes/blob/master/commons/src/main/java/org/calrissian/accumulorecipes/commons/hadoop/EventWritable.java [4] https://github.com/calrissian/mango/blob/master/mango-core/src/main/java/org/calrissian/mango/domain/Tuple.java I'm at a loss. I've tested using the SerializableWritable and serializing an EventWritable to an ObjectOutputStream in a unit test and it serialized fine without loss of data. I also verified that the Event object itself serializes and deserializes fine with an ObjectOutputStream. I'm trying to follow breakpoints through the code to figure out where exactly this may be happening but the objects all seem to be bytes already when passed into the JavaSerializerInstance (if I'm properly following what's going on, that is). Any ideas on what this may be? I'm using Spark 1.2.0 and Scala 2.10 but the business objects I'm using are from Java 1.7.
spark, reading from s3
I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity?