Re: Create RDD from output of unix command
You may want to look into using the pipe command .. http://blog.madhukaraphatak.com/pipe-in-spark/ http://spark.apache.org/docs/0.6.0/api/core/spark/rdd/PipedRDD.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723p23895.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Passing Broadcast variable as parameter
Hi. You can use a broadcast variable to make data available to all the nodes in your cluster that can live longer then just the current distributed task. For example if you need a to access a large structure in multiple sub-tasks, instead of sending that structure again and again with each sub-task you can send it only once and access the data inside the operation (map, flatmap etc.) by way of the broadcast variable name .value See : https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables Note however that you should treat the broadcast variable as a read-only structure as it is not synced between workers after it is broadcasted. To broadcast, your data must be serializable. If the data you are trying to broadcast is a distributed RDD (and thus I assumably large), perhaps what you need is some form of join operation (or cogroup)? Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-Broadcast-variable-as-parameter-tp23760p23898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: write a HashMap to HDFS in Spark
Hi. Assuming your have the data in an RDD you can save your RDD (regardless of structure) with nameRDD.saveAsObjectFile(path) where path can be hdfs:///myfolderonHDFS or the local file system. Alternatively you can also use .saveAsTextFile() Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/write-a-HashMap-to-HDFS-in-Spark-tp23813p23897.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark same execution time on 1 node and 5 nodes
Hi. If I just look at the two pics, I see that there is only one sub-task that takes all the time.. This is the flatmapToPair at Coef... line 52. I also see that there are only two partitions that make up the input and thus probably only two workers active. Try repartitioning the data into more parts before line 52 by calling rddname.repartition(10) for example and see if it runs faster.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-same-execution-time-on-1-node-and-5-nodes-tp23866p23893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PicklingError: Could not pickle object as excessively deep recursion required.
hi on windows, in local mode, using pyspark i got an error about excessively deep recursion i'm using some module for lemmatizing/stemming, which uses some dll and some binary files (module is a python wrapper around c code). spark version 1.4.0 any idea what is going on? --- PicklingError Traceback (most recent call last) ipython-input-10-f699414a7f1a in module() 1 df1 = df.map(lambda p: lemmatizer.lemmatize('working')) 2 df1.take(1) C:\spark/python\pyspark\rdd.pyc in take(self, num) 1263 1264 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1265 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1266 1267 items += res C:\spark/python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 878 # SparkContext#runJob. 879 mappedRDD = rdd.mapPartitions(partitionFunc) -- 880 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions, 881 allowLocal) 882 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) C:\spark/python\pyspark\rdd.pyc in _jrdd(self) 2349 command = (self.func, profiler, self._prev_jrdd_deserializer, 2350self._jrdd_deserializer) - 2351 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self) 2352 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 2353 bytearray(pickled_cmd), C:\spark/python\pyspark\rdd.pyc in _prepare_for_python_RDD(sc, command, obj) 2269 # the serialized command will be compressed by broadcast 2270 ser = CloudPickleSerializer() - 2271 pickled_command = ser.dumps(command) 2272 if len(pickled_command) (1 20): # 1M 2273 # The broadcast will have same life cycle as created PythonRDD C:\spark/python\pyspark\serializers.pyc in dumps(self, obj) 425 426 def dumps(self, obj): -- 427 return cloudpickle.dumps(obj, 2) 428 429 C:\spark/python\pyspark\cloudpickle.pyc in dumps(obj, protocol) 620 621 cp = CloudPickler(file,protocol) -- 622 cp.dump(obj) 623 624 return file.getvalue() C:\spark/python\pyspark\cloudpickle.pyc in dump(self, obj) 109 if 'recursion' in e.args[0]: 110 msg = Could not pickle object as excessively deep recursion required. -- 111 raise pickle.PicklingError(msg) 112 113 def save_memoryview(self, obj): PicklingError: Could not pickle object as excessively deep recursion required.
Re: Spark APIs memory usage?
Even if I remove numpy calls. (no matrices loaded), Same exception is coming. Can anyone tell what createDataFrame does internally? Are there any alternatives for it? On Fri, Jul 17, 2015 at 6:43 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I suspect its the numpy filling up Memory. Thanks Best Regards On Fri, Jul 17, 2015 at 5:46 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: 1. load 3 matrices of size ~ 1 X 1 using numpy. 2. rdd2 = rdd1.values().flatMap( fun ) # rdd1 has roughly 10^7 tuples 3. df = sqlCtx.createDataFrame(rdd2) 4. df.save() # in parquet format It throws exception in createDataFrame() call. I don't know what exactly it is creating ? everything in memory? or can I make it to persist simultaneously while getting created. Thanks On Fri, Jul 17, 2015 at 5:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste the code? How much memory does your system have and how big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)? Thanks Best Regards On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Thanks, Code is running on a single machine. And it still doesn't answer my question. On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma
Re: Flatten list
Hi. To be honest I don't really understand your problem declaration :( but lets just talk about how .flatmap works. Unlike .map(), that only allows a one-to-one transformation, .flatmap() allows 0, 1 or many outputs per item processed but the output must take the form of a sequence of the same type, like a /List/ for example. All the sequences will then be merged (i.e. flattened) in the end into a single RDD of that type. Note however that an array does not inherit from Sequence and thus you must transform it to a Sequence or something that inherits from AbstractSeq, like a List. See http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.List vs. http://www.scala-lang.org/api/current/index.html#scala.Array For example, lets assume you have an RDD[(Array[Int])] and you want all the Int values flattened into a single RDD[(Int)]. The code would be something like so: val intArraysRDD : RDD[(Array[Int])] = ...some code to get array... val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array = { var ret : List[(Int)] = nil for ( i - array) { ret = i :: ret } ret }) This is an intentionally explicit version.. A simpler could would be something like this .. val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array = array.toList) However, to understand exactly your problem you need to explain better what the RDD you want to create should look like.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Flatten-list-tp23887p23892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and SQL Server
I think you have a mistake on call jdbc(), it should be: jdbc(self, url, table, mode, properties) You had use properties as the third parameter. On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Hello, I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a couple of issues writing back. In Scala 2.10 I can write back to the database except for a couple of types. 1. When I read a DataFrame from a table that contains a datetime column it comes in as a java.sql.Timestamp object in the DataFrame. This is alright for Spark purposes, but when I go to write this back to the database with df.write.jdbc(…) it errors out because it is trying to write the TimeStamp type to SQL Server, which is not a date/time storing type in TSQL. I think it should be writing a datetime, but I’m not sure how to tell Spark this. 2. A related misunderstanding happens when I try to write a java.lang.boolean to the database; it errors out because Spark is trying to specify the width of the bit type, which is illegal in SQL Server (error msg: Cannot specify a column width on data type bit). Do I need to edit Spark source to fix this behavior, or is there a configuration option somewhere that I am not aware of? When I attempt to write back to SQL Server in an IPython notebook, py4j seems unable to convert a Python dict into a Java hashmap, which is necessary for parameter passing. I’ve documented details of this problem with code examples herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session. Any advice would be appreciated. Thank you for your time, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using reference for RDD is safe?
Hi. All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset. See section RDD Operations in https://spark.apache.org/docs/1.2.0/programming-guide.html Thus, neither your myrdd2 nor myrdd will exist until you call the count. What is stored is just how to create myrdd and myrdd2 so yes, this is safe.. When you run myrdd2.count the both RDDs are created, myrdd2 is counted and the count printed out. After the operation both RDDs are destroyed again. If you run the myrdd2.count again, both myrdd and myrdd2 are created again .. If your transformation is expensive, you may want to keep the data around and for that must use .persist() or .cache() etc. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No. of Task vs No. of Executors
You could even try changing the block size of the input data on HDFS (can be done on a per file basis) and that would get all workers going right from the get-go in Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824p23896.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: K Nearest Neighbours
Hi. What I would do in your case would be something like this.. Lets call the two datasets, qs and ds, where qs is an array of vectors and ds is an RDD[(dsID: Long, Vector)]. Do the following: 1) create a k-NN class that can keep track of the k-Nearest Neighbors so far. It must have a qsID and some structure for the k nearest neighbors Seq[(dsID:Long, Distance: Long)] and the function .add( nn : (Long, Vector) ) that will do the distance calc and update the kNN when appropriate. 2) collect the qs and key-it as well, so each qs has an ID, i.e. qs = Array[(qsID : Long, Vector)] Now what you want to do is not create all the distance stuff, but just the k-NNs. To do this we will actually create a few k-NN for each query vector, one for each partition, and then merge them later. 3) do a ds.mapPartition() and inside the function you create a k-NN for the each qs, scan the ds points of the partition and output an iterator pointing to the set of k-NNs created. val k = 100 val qs = new Array[(KNNClass)]() val ds = RDD[(Long, Vector)]() val knnResults = ds.mapPartitions( itr = { val knns = qs.map( qp = (qp._1, new KNNClass(k, qp) ) itr.foreach( dp = { knns.foreach( knn = knn.add( dp )) } ) knns.iterator }) Now you have one k-NN per partition for each query point, but this we can simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into a single k-NN. val knnResultFinal = knnResults.reduceByKey( (a, b) = KNNClass.merge( a, b) ) Where you have a static function that merges the two k-NNs, i.e. we simply concatenate them and sort on distance, and then take the k top values and returns them as a new knn class. If you want to control how many k-NNs are create you can always repartition ds. How does that sound? Does this make any sense? :) Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using Dataframe write with newHdoopApi
Hi I am trying to use DF and save it to Elasticsearch using newHadoopApi (because I am using python). Can anyone guide me to help if this is even possible? -- Best Regards, Ayan Guha
Re: DataFrame more efficient than RDD?
Here is a related thread: http://search-hadoop.com/m/q3RTtPmjSJ1Dod92 On Jul 15, 2015, at 7:41 AM, k0ala k0ala.k0...@gmail.com wrote: Hi, I have been working a bit with RDD, and am now taking a look at DataFrames. The schema definition using case classes looks very attractive; https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection Is a DataFrame more efficient (space-wise) than an RDD for the same case class? And in general, when should DataFrames be preferred over RDDs, and vice versa? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-more-efficient-than-RDD-tp23857.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
BigQuery connector for pyspark via Hadoop Input Format example
I have a large dataset stored into a BigQuery table and I would like to load it into a pypark RDD for ETL data processing. I realized that BigQuery supports the Hadoop Input / Output format https://cloud.google.com/hadoop/writing-with-bigquery-connector and pyspark should be able to use this interface in order to create an RDD by using the method newAPIHadoopRDD. http://spark.apache.org/docs/latest/api/python/pyspark.html Unfortunately, the documentation on both ends seems scarce and goes beyond my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out how to do this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/BigQuery-connector-for-pyspark-via-Hadoop-Input-Format-example-tp23900.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames
I am facing the same issue, i tried this but getting compilation error for the $ in the explode function So, I had to modify to the below to make it work. df.select(explode(new Column(entities.user_mentions)).as(mention)) On Wed, Jun 24, 2015 at 2:48 PM, Michael Armbrust mich...@databricks.com wrote: Starting in Spark 1.4 there is also an explode that you can use directly from the select clause (much like in HiveQL): import org.apache.spark.sql.functions._ df.select(explode($entities.user_mentions).as(mention)) Unlike standard HiveQL, you can also include other attributes in the select or even $*. On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai yh...@databricks.com wrote: The function accepted by explode is f: Row = TraversableOnce[A]. Seems user_mentions is an array of structs. So, can you change your pattern matching to the following? case Row(rows: Seq[_]) = rows.asInstanceOf[Seq[Row]].map(elem = ...) On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones garjo...@socialmetrix.com wrote: Hi All, I am using the new *Apache Spark version 1.4.0 Data-frames API* to extract information from Twitter's Status JSON, mostly focused on the Entities Object https://dev.twitter.com/overview/api/entities - the relevant part to this question is showed below: { ... ... entities: { hashtags: [], trends: [], urls: [], user_mentions: [ { screen_name: linobocchini, name: Lino Bocchini, id: 187356243, id_str: 187356243, indices: [ 3, 16 ] }, { screen_name: jeanwyllys_real, name: Jean Wyllys, id: 23176, id_str: 23176, indices: [ 79, 95 ] } ], symbols: [] }, ... ... } There are several examples on how extract information from primitives types as string, integer, etc - but I couldn't find anything on how to process those kind of *complex* structures. I tried the code below but it is still doesn't work, it throws an Exception val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) val tweets = sqlContext.read.json(tweets.json) // this function is just to filter empty entities.user_mentions[] nodes // some tweets doesn't contains any mentions import org.apache.spark.sql.functions.udf val isEmpty = udf((value: List[Any]) = value.isEmpty) import org.apache.spark.sql._ import sqlContext.implicits._ case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String) val mentions = tweets.select(entities.user_mentions). filter(!isEmpty($user_mentions)). explode($user_mentions) { case Row(arr: Array[Row]) = arr.map { elem = UserMention( elem.getAs[Long](id), elem.getAs[String](is_str), elem.getAs[Array[Long]](indices), elem.getAs[String](name), elem.getAs[String](screen_name)) } } mentions.first Exception when I try to call mentions.first: scala mentions.first 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [23176,23176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81) What is wrong here? I understand it is related to the types but I couldn't figure out it yet. As additional context, the structure mapped automatically is: scala mentions.printSchema root |-- user_mentions: array (nullable = true) ||-- element: struct (containsNull = true) |||-- id: long (nullable = true) |||-- id_str: string (nullable = true) |||-- indices: array (nullable = true) ||||-- element: long (containsNull = true) |||-- name: string (nullable = true) |||-- screen_name: string (nullable = true) *NOTE 1:* I know it is possible to solve this using HiveQL but I would like to use Data-frames once there is so much momentum around it. SELECT explode(entities.user_mentions) as mentions FROM tweets *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) = value.isEmpty) is a ugly hack and I'm missing something here, but was the only way I came up to avoid a NPE I’ve posted the same question on SO: http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames Thanks all! - gustavo
Spark-hive parquet schema evolution
Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: No. of Task vs No. of Executors
This is likely due to data skew. If you are using key-value pairs, one key has a lot more records, than the other keys. Do you have any groupBy operations? David On Tue, Jul 14, 2015 at 9:43 AM, shahid sha...@trialx.com wrote: hi I have a 10 node cluster i loaded the data onto hdfs, so the no. of partitions i get is 9. I am running a spark application , it gets stuck on one of tasks, looking at the UI it seems application is not using all nodes to do calculations. attached is the screen shot of tasks, it seems tasks are put on each node more then once. looking at tasks 8 tasks get completed under 7-8 minutes and one task takes around 30 minutes so causing the delay in results. http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- ### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
Re: spark-shell with Yarn failed
it might be a network issue. The error states failed to bind the server IP address Chester Sent from my iPhone On Jul 18, 2015, at 11:46 AM, Amjad ALSHABANI ashshab...@gmail.com wrote: Does anybody have any idea about the error I m having.. I am really clueless... And appreciate any idea :) Thanks in advance Amjad On Jul 17, 2015 5:37 PM, Amjad ALSHABANI ashshab...@gmail.com wrote: Hello, First of all I m a newbie in Spark , I m trying to start the spark-shell with yarn cluster by running: $ spark-shell --master yarn-client Sometimes it goes well, but most of the time I got an error: Container exited with a non-zero exit code 10 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1437145851944 final status: FAILED tracking URL: http://My-HadoopServer:50080/cluster/app/application_143708028_0030 user: hadoop org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115) searching in the yarn logs I got this log $ yarn logs -applicationId application_143708028_0030 2015-07-17 17:11:03,961 - INFO [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74] - Starting remoting 2015-07-17 17:11:04,200 - ERROR [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65] - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty transport 2015-07-17 17:11:04,210 - WARN [main:Logging$class@71] - Service 'sparkYarnAM' could not bind on port 0. Attempting port 1. ... ... ... 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught exception: java.net.BindException: Failed to bind to: My-HadoopServer/HadoopServerIP:0: Service 'sparkYarnAM' failed after 16 retries! at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) ... I m using Spark 1.3, Hadoop 2.6 , and in spark-env.sh it points to my hadoop configuration: export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf Is this probleme coming from spark configuration or yarn configuration (or spark with yarn confs) Any Ideas?? Amjad
RE: Feature Generation On Spark
Try this (replace ... with the appropriate values for your environment): import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector val sc = new SparkContext(...) val documents = sc.wholeTextFile(...) val tokenized = documents.map{ case(path, document) = (path, document.split(\\s+))} val numFeatures = 10 val hashingTF = new HashingTF(numFeatures) val featurized = tokenized.map{case(path, words) = (path, hashingTF.transform(words))} Mohammed From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com] Sent: Friday, July 17, 2015 12:33 AM To: Mohammed Guller Subject: Re: Feature Generation On Spark Thanks I did look at the example. I am using Spark 1.2. The modules mentioned there are not in 1.2 I guess. The import is failing Rishi From: Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com Sent: Friday, July 10, 2015 2:31 AM To: rishikesh thakur; ayan guha; Michal Čizmazia Cc: user Subject: RE: Feature Generation On Spark Take a look at the examples here: https://spark.apache.org/docs/latest/ml-guide.html Mohammed From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com] Sent: Saturday, July 4, 2015 10:49 PM To: ayan guha; Michal Čizmazia Cc: user Subject: RE: Feature Generation On Spark I have one document per file and each file is to be converted to a feature vector. Pretty much like standard feature construction for document classification. Thanks Rishi Date: Sun, 5 Jul 2015 01:44:04 +1000 Subject: Re: Feature Generation On Spark From: guha.a...@gmail.commailto:guha.a...@gmail.com To: mici...@gmail.commailto:mici...@gmail.com CC: rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com; user@spark.apache.orgmailto:user@spark.apache.org Do you have one document per file or multiple document in the file? On 4 Jul 2015 23:38, Michal Čizmazia mici...@gmail.commailto:mici...@gmail.com wrote: Spark Context has a method wholeTextFiles. Is that what you need? On 4 July 2015 at 07:04, rishikesh rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com wrote: Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: spark-shell with Yarn failed
Does anybody have any idea about the error I m having.. I am really clueless... And appreciate any idea :) Thanks in advance Amjad On Jul 17, 2015 5:37 PM, Amjad ALSHABANI ashshab...@gmail.com wrote: Hello, First of all I m a newbie in Spark , I m trying to start the spark-shell with yarn cluster by running: $ spark-shell --master yarn-client Sometimes it goes well, but most of the time I got an error: Container exited with a non-zero exit code 10 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1437145851944 final status: FAILED tracking URL: http://My-HadoopServer:50080/cluster/app/application_143708028_0030 user: hadoop org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115) searching in the yarn logs I got this log $ yarn logs -applicationId application_143708028_0030 2015-07-17 17:11:03,961 - INFO [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74] - Starting remoting 2015-07-17 17:11:04,200 - ERROR [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65] - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty transport 2015-07-17 17:11:04,210 - WARN [main:Logging$class@71] - Service 'sparkYarnAM' could not bind on port 0. Attempting port 1. ... ... ... 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught exception: java.net.BindException: Failed to bind to: My-HadoopServer/HadoopServerIP:0: Service 'sparkYarnAM' failed after 16 retries! at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) ... I m using Spark 1.3, Hadoop 2.6 , and in spark-env.sh it points to my hadoop configuration: export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf Is this probleme coming from spark configuration or yarn configuration (or spark with yarn confs) Any Ideas?? Amjad
Spark1.4 application throw java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration
hi I have build a spark application with IDEA. when run SparkPI , IDEA throw exception as that : Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:108) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:99) at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62) at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:61) at org.apache.spark.ui.SparkUI.init(SparkUI.scala:74) at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:190) at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:440) at org.learn.SparkPI$.main(SparkPI.scala:27)at org.learn.SparkPI.main(SparkPI.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)Caused by: java.lang.ClassNotFoundException: javax.servlet.FilterRegistrationat java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)at java.lang.ClassLoader.loadClass(ClassLoader.java:358) And the application SparkPI like this: def main(args:Array[String]): Unit = { val conf = new SparkConf().setAppName(Spark Pi) conf.setMaster(local) val spark = new SparkContext(conf) //spark.addJar(D:\\BigdataResearch\\SparkLeaning\\out\\artifacts\\sparkleaning_jar\\sparkleaning.jar) val slices = if (args.length 0)args(0).toInt else 2 val n = 10 * slices val count = spark.parallelize(1 to n, slices).map{ i = val x = random * 2 -1 val y = random * 2 -1 if (x*x + y*y 1) 1 else 0 }.reduce(_ + _) println(Pi is roughly + 4.0 * count / n) spark.stop() } } And the build.sbt like this:name := SparkLearning version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.hive% hive-jdbc % 0.13.1 , org.apache.hadoop % hadoop-common % 2.2.0 excludeAll ExclusionRule(organization = javax.servlet), org.apache.hadoop % hadoop-client % 2.2.0 excludeAll ExclusionRule(organization = javax.servlet), org.scalatest %% scalatest % 2.2.0 , org.apache.spark %% spark-core % 1.4.0, org.apache.spark %% spark-sql % 1.4.0, org.apache.spark %% spark-hive % 1.4.0, org.apache.spark %% spark-mllib % 1.4.0, org.apache.spark %% spark-streaming % 1.4.0, org.apache.spark %% spark-streaming-kafka % 1.4.0 , org.eclipse.jetty%jetty-servlet%8.1.14.v20131031, org.eclipse.jetty%jetty-http%8.1.14.v20131031, org.eclipse.jetty%jetty-server%8.1.14.v20131031, org.eclipse.jetty%jetty-util%8.1.14.v20131031, org.eclipse.jetty%jetty-security%8.1.14.v20131031, org.eclipse.jetty%jetty-plus%8.1.14.v20131031, org.apache.kafka%%kafka%0.8.2.1, net.sf.json-lib%json-lib%2.4 from http://gradle.artifactoryonline.com/gradle/libs/net/sf/json-lib/json-lib/2.4/json-lib-2.4-jdk15.jar;, com.databricks%%spark-csv%1.0.3 )Please give me some suggestion !
How to restart Twitter spark stream
Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran
[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?
I wanted to ask a general question about Hadoop/Yarn and Apache Spark integration. I know that Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise network traffic by saving replicated blocks within a rack. i.e. I wondered whether, when Spark is configured to use Yarn as a cluster manager, it is able to use this feature to also minimise network traffic to a degree. Sorry if this questionn is not quite accurate but I think you can generally see what I mean ?