Re: Create RDD from output of unix command

2015-07-18 Thread Gylfi
You may want to look into using the pipe command .. 
http://blog.madhukaraphatak.com/pipe-in-spark/
http://spark.apache.org/docs/0.6.0/api/core/spark/rdd/PipedRDD.html




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723p23895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Passing Broadcast variable as parameter

2015-07-18 Thread Gylfi
Hi.

You can use a broadcast variable to make data available to all the nodes in
your cluster that can live longer then just the current distributed task. 

For example if you need a to access a large structure in multiple sub-tasks,
instead of sending that structure again and again with each sub-task you can
send it only once and access the data inside the operation (map, flatmap
etc.) by way of the broadcast variable name .value 

See :
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Note however that you should treat the broadcast variable as a read-only
structure as it is not synced between workers after it is broadcasted.

To broadcast, your data must be serializable.

If the data you are trying to broadcast is a distributed RDD (and thus I
assumably large), perhaps what you need is some form of join operation (or
cogroup)? 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-Broadcast-variable-as-parameter-tp23760p23898.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: write a HashMap to HDFS in Spark

2015-07-18 Thread Gylfi
Hi. 

Assuming your have the data in an RDD you can save your RDD (regardless of
structure) with nameRDD.saveAsObjectFile(path)   where path can be
hdfs:///myfolderonHDFS or the local file system. 

Alternatively you can also use .saveAsTextFile()

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/write-a-HashMap-to-HDFS-in-Spark-tp23813p23897.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark same execution time on 1 node and 5 nodes

2015-07-18 Thread Gylfi
Hi. 

If I just look at the two pics, I see that there is only one sub-task that
takes all the time.. 
This is the flatmapToPair at Coef...  line 52.
I also see that there are only two partitions that make up the input and
thus probably only two workers active. 

Try repartitioning the data into more parts before line 52 by calling
rddname.repartition(10) for example and see if it runs faster.. 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-same-execution-time-on-1-node-and-5-nodes-tp23866p23893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PicklingError: Could not pickle object as excessively deep recursion required.

2015-07-18 Thread Andrej Burja
hi

on windows, in local mode, using pyspark i got an error about excessively
deep recursion
i'm using some module for lemmatizing/stemming, which uses some dll and
some binary files (module is a python wrapper around c code).
spark version 1.4.0
any idea what is going on?

---
PicklingError Traceback (most recent call last)
ipython-input-10-f699414a7f1a in module()
  1 df1 = df.map(lambda p: lemmatizer.lemmatize('working'))
 2 df1.take(1)

C:\spark/python\pyspark\rdd.pyc in take(self, num)
   1263
   1264 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
- 1265 res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1266
   1267 items += res

C:\spark/python\pyspark\context.pyc in runJob(self, rdd, partitionFunc,
partitions, allowLocal)
878 # SparkContext#runJob.
879 mappedRDD = rdd.mapPartitions(partitionFunc)
-- 880 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, partitions,
881   allowLocal)
882 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))

C:\spark/python\pyspark\rdd.pyc in _jrdd(self)
   2349 command = (self.func, profiler,
self._prev_jrdd_deserializer,
   2350self._jrdd_deserializer)
- 2351 pickled_cmd, bvars, env, includes =
_prepare_for_python_RDD(self.ctx, command, self)
   2352 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2353  bytearray(pickled_cmd),

C:\spark/python\pyspark\rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2269 # the serialized command will be compressed by broadcast
   2270 ser = CloudPickleSerializer()
- 2271 pickled_command = ser.dumps(command)
   2272 if len(pickled_command)  (1  20):  # 1M
   2273 # The broadcast will have same life cycle as created
PythonRDD

C:\spark/python\pyspark\serializers.pyc in dumps(self, obj)
425
426 def dumps(self, obj):
-- 427 return cloudpickle.dumps(obj, 2)
428
429

C:\spark/python\pyspark\cloudpickle.pyc in dumps(obj, protocol)
620
621 cp = CloudPickler(file,protocol)
-- 622 cp.dump(obj)
623
624 return file.getvalue()

C:\spark/python\pyspark\cloudpickle.pyc in dump(self, obj)
109 if 'recursion' in e.args[0]:
110 msg = Could not pickle object as excessively
deep recursion required.
-- 111 raise pickle.PicklingError(msg)
112
113 def save_memoryview(self, obj):

PicklingError: Could not pickle object as excessively deep recursion
required.


Re: Spark APIs memory usage?

2015-07-18 Thread Harit Vishwakarma
Even if I remove numpy calls. (no matrices loaded), Same exception is
coming.
Can anyone tell what createDataFrame does internally? Are there any
alternatives for it?

On Fri, Jul 17, 2015 at 6:43 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I suspect its the numpy filling up Memory.

 Thanks
 Best Regards

 On Fri, Jul 17, 2015 at 5:46 PM, Harit Vishwakarma 
 harit.vishwaka...@gmail.com wrote:

 1. load 3 matrices of size ~ 1 X 1 using numpy.
 2. rdd2 = rdd1.values().flatMap( fun )  # rdd1 has roughly 10^7 tuples
 3. df = sqlCtx.createDataFrame(rdd2)
 4. df.save() # in parquet format

 It throws exception in createDataFrame() call. I don't know what exactly
 it is creating ? everything in memory? or can I make it to persist
 simultaneously while getting created.

 Thanks


 On Fri, Jul 17, 2015 at 5:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you paste the code? How much memory does your system have and how
 big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)?

 Thanks
 Best Regards

 On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma 
 harit.vishwaka...@gmail.com wrote:

 Thanks,
 Code is running on a single machine.
 And it still doesn't answer my question.

 On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote:

 You can bump up number of partitions while creating the rdd you are
 using for df
 On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com
 wrote:

 Hi,

 I used createDataFrame API of SqlContext in python. and getting
 OutOfMemoryException. I am wondering if it is creating whole dataFrame in
 memory?
 I did not find any documentation describing memory usage of Spark
 APIs.
 Documentation given is nice but little more details (specially on
 memory usage/ data distribution etc.) will really help.

 --
 Regards
 Harit Vishwakarma




 --
 Regards
 Harit Vishwakarma





 --
 Regards
 Harit Vishwakarma





-- 
Regards
Harit Vishwakarma


Re: Flatten list

2015-07-18 Thread Gylfi
Hi. 

To be honest I don't really understand your problem declaration :(  but lets
just talk about how .flatmap works. 
Unlike .map(), that only allows a one-to-one transformation, .flatmap()
allows 0, 1 or many outputs per item processed but the output must take the
form of a sequence of the same type, like a /List/ for example. 
All the sequences will then be merged (i.e. flattened) in the end into a
single RDD of that type. 
Note however that an array does not inherit from Sequence and thus you must
transform it to a Sequence or something that inherits from AbstractSeq, like
a List. 
See 
http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.List
 
vs. 
http://www.scala-lang.org/api/current/index.html#scala.Array

For example, lets assume you have an RDD[(Array[Int])] and you want all the
Int values flattened into a single RDD[(Int)]. The code would be something
like so: 

val intArraysRDD : RDD[(Array[Int])] = ...some code to get array... 
val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array = {
var ret : List[(Int)] = nil 
for ( i - array) {
ret = i :: ret
}
ret
}) 

This is an intentionally explicit version.. 
A simpler could would be something like this .. 
val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array =
array.toList)

However, to understand exactly your problem you need to explain better what
the RDD you want to create should look like.. 
 
Regards, 
   Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Flatten-list-tp23887p23892.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and SQL Server

2015-07-18 Thread Davies Liu
I think you have a mistake on call jdbc(), it should be:

jdbc(self, url, table, mode, properties)

You had use properties as the third parameter.

On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a 
 couple of issues writing back. In Scala 2.10 I can write back to the database 
 except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think it 
 should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error msg: 
 Cannot specify a column width on data type bit). Do I need to edit Spark 
 source to fix this behavior, or is there a configuration option somewhere 
 that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j seems 
 unable to convert a Python dict into a Java hashmap, which is necessary for 
 parameter passing. I’ve documented details of this problem with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using reference for RDD is safe?

2015-07-18 Thread Gylfi
Hi. 

All transformations in Spark are lazy, in that they do not compute their
results right away. Instead, they just remember the transformations applied
to some base dataset (e.g. a file). The transformations are only computed
when an action requires a result to be returned to the driver program. This
design enables Spark to run more efficiently – for example, we can realize
that a dataset created through map will be used in a reduce and return only
the result of the reduce to the driver, rather than the larger mapped
dataset.
See section RDD Operations in
https://spark.apache.org/docs/1.2.0/programming-guide.html

Thus, neither your myrdd2 nor myrdd will exist until you call the count. 
What is stored is just how to create myrdd and myrdd2 so yes, this is
safe.. 

When you run myrdd2.count the both RDDs are created, myrdd2 is counted and
the count printed out.
After the operation both RDDs are destroyed again. 
If you run the myrdd2.count again, both myrdd and myrdd2 are created again
.. 

If your transformation is expensive, you may want to keep the data around
and for that must use .persist() or .cache() etc.  

Regards,
   Gylfi. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No. of Task vs No. of Executors

2015-07-18 Thread Gylfi
You could even try changing the block size of the input data on HDFS (can be
done on a per file basis) and that would get all workers going right from
the get-go in Spark. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824p23896.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: K Nearest Neighbours

2015-07-18 Thread Gylfi
Hi. 

What I would do in your case would be something like this.. 

Lets call the two datasets, qs and ds, where qs is an array of vectors and
ds is an RDD[(dsID: Long, Vector)]. 

Do the following: 
1) create a k-NN class that can keep track of the k-Nearest Neighbors so
far. It must have a qsID and some structure for the k nearest neighbors
Seq[(dsID:Long, Distance: Long)]  and the function .add( nn : (Long, Vector)
) that will do the distance calc and update the kNN when appropriate.  
2) collect the qs and key-it as well, so each qs has an ID, i.e. qs =
Array[(qsID : Long, Vector)]

Now what you want to do is not create all the distance stuff, but just the
k-NNs. To do this we will actually create a few k-NN for each query vector,
one for each partition, and then merge them later. 

3) do a ds.mapPartition() and inside the function you create a k-NN for the
each qs, scan the ds points of the partition and output an iterator pointing
to the set of k-NNs created. 
val k = 100
val qs = new Array[(KNNClass)]()
val ds = RDD[(Long, Vector)]() 
val knnResults = ds.mapPartitions( itr = {
  val knns = qs.map( qp =  (qp._1, new KNNClass(k, qp) )
  itr.foreach( dp = {
knns.foreach( knn = knn.add( dp ))
  } )
  knns.iterator
})

Now you have one k-NN per partition for each query point, but this we can
simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into
a single k-NN. 

val knnResultFinal = knnResults.reduceByKey( (a, b) = KNNClass.merge( a, b)
)

Where you have a static function that merges the two k-NNs, i.e. we simply
concatenate them and sort on distance, and then take the k top values and
returns them as a new knn class. 

If you want to control how many k-NNs are create you can always repartition
ds. 

How does that sound? Does this make any sense?   :) 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using Dataframe write with newHdoopApi

2015-07-18 Thread ayan guha
Hi

I am trying to use DF and save it to Elasticsearch using newHadoopApi
(because I am using python). Can anyone guide me to help if this is even
possible?

-- 
Best Regards,
Ayan Guha


Re: DataFrame more efficient than RDD?

2015-07-18 Thread Ted Yu
Here is a related thread:
http://search-hadoop.com/m/q3RTtPmjSJ1Dod92


 On Jul 15, 2015, at 7:41 AM, k0ala k0ala.k0...@gmail.com wrote:
 
 Hi,
 
 I have been working a bit with RDD, and am now taking a look at DataFrames.
 The schema definition using case classes looks very attractive;
 
 https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection
 https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection
   
 
 Is a DataFrame more efficient (space-wise) than an RDD for the same case
 class?
 
 And in general, when should DataFrames be preferred over RDDs, and vice
 versa?
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-more-efficient-than-RDD-tp23857.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


BigQuery connector for pyspark via Hadoop Input Format example

2015-07-18 Thread lfiaschi
I have a large dataset stored into a BigQuery table and I would like to load
it into a pypark RDD for ETL data processing.

I realized that BigQuery supports the Hadoop Input / Output format

https://cloud.google.com/hadoop/writing-with-bigquery-connector

and pyspark should be able to use this interface in order to create an RDD
by using the method newAPIHadoopRDD.

http://spark.apache.org/docs/latest/api/python/pyspark.html

Unfortunately, the documentation on both ends seems scarce and goes beyond
my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out
how to do this?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/BigQuery-connector-for-pyspark-via-Hadoop-Input-Format-example-tp23900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-07-18 Thread Naveen Madhire
I am facing the same issue, i tried this but getting compilation error for
the $ in the explode function

So, I had to modify to the below to make it work.

df.select(explode(new Column(entities.user_mentions)).as(mention))




On Wed, Jun 24, 2015 at 2:48 PM, Michael Armbrust mich...@databricks.com
wrote:

 Starting in Spark 1.4 there is also an explode that you can use directly
 from the select clause (much like in HiveQL):

 import org.apache.spark.sql.functions._
 df.select(explode($entities.user_mentions).as(mention))

 Unlike standard HiveQL, you can also include other attributes in the
 select or even $*.


 On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai yh...@databricks.com wrote:

 The function accepted by explode is f: Row = TraversableOnce[A]. Seems
 user_mentions is an array of structs. So, can you change your
 pattern matching to the following?

 case Row(rows: Seq[_]) = rows.asInstanceOf[Seq[Row]].map(elem = ...)

 On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones 
 garjo...@socialmetrix.com wrote:

 Hi All,

 I am using the new *Apache Spark version 1.4.0 Data-frames API* to
 extract information from Twitter's Status JSON, mostly focused on the 
 Entities
 Object https://dev.twitter.com/overview/api/entities - the relevant
 part to this question is showed below:

 {
   ...
   ...
   entities: {
 hashtags: [],
 trends: [],
 urls: [],
 user_mentions: [
   {
 screen_name: linobocchini,
 name: Lino Bocchini,
 id: 187356243,
 id_str: 187356243,
 indices: [ 3, 16 ]
   },
   {
 screen_name: jeanwyllys_real,
 name: Jean Wyllys,
 id: 23176,
 id_str: 23176,
 indices: [ 79, 95 ]
   }
 ],
 symbols: []
   },
   ...
   ...
 }

 There are several examples on how extract information from primitives
 types as string, integer, etc - but I couldn't find anything on how to
 process those kind of *complex* structures.

 I tried the code below but it is still doesn't work, it throws an
 Exception

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 val tweets = sqlContext.read.json(tweets.json)

 // this function is just to filter empty entities.user_mentions[] nodes
 // some tweets doesn't contains any mentions
 import org.apache.spark.sql.functions.udf
 val isEmpty = udf((value: List[Any]) = value.isEmpty)

 import org.apache.spark.sql._
 import sqlContext.implicits._
 case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
 String, screenName: String)

 val mentions = tweets.select(entities.user_mentions).
   filter(!isEmpty($user_mentions)).
   explode($user_mentions) {
   case Row(arr: Array[Row]) = arr.map { elem =
 UserMention(
   elem.getAs[Long](id),
   elem.getAs[String](is_str),
   elem.getAs[Array[Long]](indices),
   elem.getAs[String](name),
   elem.getAs[String](screen_name))
   }
 }

 mentions.first

 Exception when I try to call mentions.first:

 scala mentions.first
 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
 scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
 Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
 Wyllys,jeanwyllys_real])] (of class 
 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
 at 
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at 
 $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
 at 
 org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)

 What is wrong here? I understand it is related to the types but I
 couldn't figure out it yet.

 As additional context, the structure mapped automatically is:

 scala mentions.printSchema
 root
  |-- user_mentions: array (nullable = true)
  ||-- element: struct (containsNull = true)
  |||-- id: long (nullable = true)
  |||-- id_str: string (nullable = true)
  |||-- indices: array (nullable = true)
  ||||-- element: long (containsNull = true)
  |||-- name: string (nullable = true)
  |||-- screen_name: string (nullable = true)

 *NOTE 1:* I know it is possible to solve this using HiveQL but I would
 like to use Data-frames once there is so much momentum around it.

 SELECT explode(entities.user_mentions) as mentions
 FROM tweets

 *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =
 value.isEmpty) is a ugly hack and I'm missing something here, but was
 the only way I came up to avoid a NPE

 I’ve posted the same question on SO:
 http://stackoverflow.com/questions/31016156/how-to-extract-complex-json-structures-using-apache-spark-1-4-0-data-frames

 Thanks all!
 - gustavo






Spark-hive parquet schema evolution

2015-07-18 Thread Jerrick Hoang
Hi all,

I'm aware of the support for schema evolution via DataFrame API. Just
wondering what would be the best way to go about dealing with schema
evolution with Hive metastore tables. So, say I create a table via SparkSQL
CLI, how would I deal with Parquet schema evolution?

Thanks,
J


Re: No. of Task vs No. of Executors

2015-07-18 Thread David Mitchell
This is likely due to data skew.  If you are using key-value pairs, one key
has a lot more records, than the other keys.  Do you have any groupBy
operations?

David


On Tue, Jul 14, 2015 at 9:43 AM, shahid sha...@trialx.com wrote:

 hi

 I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
 partitions i get is 9. I am running a spark application , it gets stuck on
 one of tasks, looking at the UI it seems application is not using all nodes
 to do calculations. attached is the screen shot of tasks, it seems tasks
 are
 put on each node more then once. looking at tasks 8 tasks get completed
 under 7-8 minutes and one task takes around 30 minutes so causing the delay
 in results.
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Re: spark-shell with Yarn failed

2015-07-18 Thread Chester @work
it might be a network issue. The error states failed to bind the server IP 
address 

Chester
Sent from my iPhone

 On Jul 18, 2015, at 11:46 AM, Amjad ALSHABANI ashshab...@gmail.com wrote:
 
 Does anybody have any idea about the error I m having.. I am really 
 clueless... And appreciate any idea :)
 
 Thanks in advance
 
 Amjad
 
 On Jul 17, 2015 5:37 PM, Amjad ALSHABANI ashshab...@gmail.com wrote:
 Hello,
 
 First of all I m a newbie in Spark ,
 
 I m trying to start the spark-shell with yarn cluster by running:
 
 $ spark-shell --master yarn-client
 
 Sometimes it goes well, but most of the time I got an error:
 
 Container exited with a non-zero exit code 10
 Failing this attempt. Failing the application.
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1437145851944
  final status: FAILED
  tracking URL: 
 http://My-HadoopServer:50080/cluster/app/application_143708028_0030
  user: hadoop
 org.apache.spark.SparkException: Yarn application has already ended! It 
 might have been killed or unable to launch application master.
 at 
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115)
 
 
 
 
 searching in the yarn logs I got this log
 
 $ yarn logs -applicationId application_143708028_0030
 2015-07-17 17:11:03,961 - INFO  
 [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74]
  - Starting remoting
 2015-07-17 17:11:04,200 - ERROR 
 [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65]
  - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty 
 transport
 2015-07-17 17:11:04,210 - WARN  [main:Logging$class@71] - Service 
 'sparkYarnAM' could not bind on port 0. Attempting port 1.
 ...
 ...
 ...
 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught exception:
 java.net.BindException: Failed to bind to: My-HadoopServer/HadoopServerIP:0: 
 Service 'sparkYarnAM' failed after 16 retries!
 at 
 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
 at 
 akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
 at 
 akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
 at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
 ...
 
 
 
 
 
 I m using Spark 1.3, Hadoop 2.6 ,
 
  and in spark-env.sh it points to my hadoop configuration:
 
 export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf
 
 
 Is this probleme coming from spark configuration or yarn configuration (or 
 spark with yarn confs)
 
 Any Ideas??
 
 
 
 Amjad


RE: Feature Generation On Spark

2015-07-18 Thread Mohammed Guller
Try this (replace ... with the appropriate values for your environment):

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector

val sc = new SparkContext(...)
val documents = sc.wholeTextFile(...)
val tokenized = documents.map{ case(path, document) = (path, 
document.split(\\s+))}
val numFeatures = 10
val hashingTF = new HashingTF(numFeatures)
val featurized = tokenized.map{case(path, words) = (path, 
hashingTF.transform(words))}


Mohammed

From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com]
Sent: Friday, July 17, 2015 12:33 AM
To: Mohammed Guller
Subject: Re: Feature Generation On Spark


Thanks I did look at the example. I am using Spark 1.2. The modules mentioned 
there are not in 1.2 I guess. The import is failing


Rishi


From: Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com
Sent: Friday, July 10, 2015 2:31 AM
To: rishikesh thakur; ayan guha; Michal Čizmazia
Cc: user
Subject: RE: Feature Generation On Spark


Take a look at the examples here:

https://spark.apache.org/docs/latest/ml-guide.html



Mohammed



From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com]
Sent: Saturday, July 4, 2015 10:49 PM
To: ayan guha; Michal Čizmazia
Cc: user
Subject: RE: Feature Generation On Spark



I have one document per file and each file is to be converted to a feature 
vector. Pretty much like standard feature construction for document 
classification.



Thanks

Rishi



Date: Sun, 5 Jul 2015 01:44:04 +1000
Subject: Re: Feature Generation On Spark
From: guha.a...@gmail.commailto:guha.a...@gmail.com
To: mici...@gmail.commailto:mici...@gmail.com
CC: rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com; 
user@spark.apache.orgmailto:user@spark.apache.org

Do you have one document per file or multiple document in the file?

On 4 Jul 2015 23:38, Michal Čizmazia 
mici...@gmail.commailto:mici...@gmail.com wrote:

Spark Context has a method wholeTextFiles. Is that what you need?

On 4 July 2015 at 07:04, rishikesh 
rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com wrote:
 Hi

 I am new to Spark and am working on document classification. Before model
 fitting I need to do feature generation. Each document is to be converted to
 a feature vector. However I am not sure how to do that. While testing
 locally I have a static list of tokens and when I parse a file I do a lookup
 and increment counters.

 In the case of Spark I can create an RDD which loads all the documents
 however I am not sure if one files goes to one executor or multiple. If the
 file is split then the feature vectors needs to be merged. But I am not able
 to figure out how to do that.

 Thanks
 Rishi



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Re: spark-shell with Yarn failed

2015-07-18 Thread Amjad ALSHABANI
Does anybody have any idea about the error I m having.. I am really
clueless... And appreciate any idea :)

Thanks in advance

Amjad
On Jul 17, 2015 5:37 PM, Amjad ALSHABANI ashshab...@gmail.com wrote:

 Hello,

 First of all I m a newbie in Spark ,

 I m trying to start the spark-shell with yarn cluster by running:

 $ spark-shell --master yarn-client

 Sometimes it goes well, but most of the time I got an error:

 Container exited with a non-zero exit code 10
 Failing this attempt. Failing the application.
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1437145851944
  final status: FAILED
  tracking URL:
 http://My-HadoopServer:50080/cluster/app/application_143708028_0030
  user: hadoop
 org.apache.spark.SparkException: Yarn application has already ended! It
 might have been killed or unable to launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115)
 
 
 

 searching in the yarn logs I got this log

 $ yarn logs -applicationId application_143708028_0030
 2015-07-17 17:11:03,961 - INFO
 [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74]
 - Starting remoting
 2015-07-17 17:11:04,200 - ERROR
 [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65]
 - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty
 transport
 2015-07-17 17:11:04,210 - WARN  [main:Logging$class@71] - Service
 'sparkYarnAM' could not bind on port 0. Attempting port 1.
 ...
 ...
 ...
 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught
 exception:
 java.net.BindException: Failed to bind to:
 My-HadoopServer/HadoopServerIP:0: Service 'sparkYarnAM' failed after 16
 retries!
 at
 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
 at
 akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
 at
 akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
 at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
 ...
 
 



 I m using Spark 1.3, Hadoop 2.6 ,

  and in spark-env.sh it points to my hadoop configuration:

 export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf


 Is this probleme coming from spark configuration or yarn configuration (or
 spark with yarn confs)

 Any Ideas??



 Amjad




Spark1.4 application throw java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration

2015-07-18 Thread Wwh 吴
hi 
I have build a spark application  with IDEA. when run SparkPI , IDEA throw 
exception as that :
Exception in thread main java.lang.NoClassDefFoundError: 
javax/servlet/FilterRegistration at 
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136)
 at 
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129)
 at 
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98)
  at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:108) 
  at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:99)  
  at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78) at 
org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at 
org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)  at 
org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:61) at 
org.apache.spark.ui.SparkUI.init(SparkUI.scala:74) at 
org.apache.spark.ui.SparkUI$.create(SparkUI.scala:190)   at 
org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:141) at 
org.apache.spark.SparkContext.init(SparkContext.scala:440) at 
org.learn.SparkPI$.main(SparkPI.scala:27)at 
org.learn.SparkPI.main(SparkPI.scala)at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)Caused by: 
java.lang.ClassNotFoundException: javax.servlet.FilterRegistrationat 
java.net.URLClassLoader$1.run(URLClassLoader.java:366)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:355)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:354)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:425)at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
And the application SparkPI like this:  def main(args:Array[String]): Unit = {  
  val conf = new SparkConf().setAppName(Spark Pi)
conf.setMaster(local)
val spark = new SparkContext(conf)

//spark.addJar(D:\\BigdataResearch\\SparkLeaning\\out\\artifacts\\sparkleaning_jar\\sparkleaning.jar)
val slices = if (args.length  0)args(0).toInt else 2
val n = 10 * slices
val count = spark.parallelize(1 to n, slices).map{ i =
  val x = random * 2 -1
  val y = random * 2 -1
  if (x*x + y*y  1) 1 else 0
}.reduce(_ + _)
println(Pi is roughly + 4.0 * count / n)
spark.stop()
  }
}

And the build.sbt like this:name := SparkLearning
version := 1.0
scalaVersion := 2.10.4
libraryDependencies ++= Seq(
  org.apache.hive% hive-jdbc % 0.13.1 ,
   org.apache.hadoop % hadoop-common % 2.2.0 excludeAll 
ExclusionRule(organization = javax.servlet),
  org.apache.hadoop % hadoop-client % 2.2.0 excludeAll 
ExclusionRule(organization = javax.servlet),
  org.scalatest %% scalatest % 2.2.0 ,
  org.apache.spark %% spark-core % 1.4.0,
  org.apache.spark %% spark-sql % 1.4.0,
  org.apache.spark %% spark-hive % 1.4.0,
  org.apache.spark %% spark-mllib % 1.4.0,
  org.apache.spark %% spark-streaming % 1.4.0,
  org.apache.spark %% spark-streaming-kafka % 1.4.0 ,
  org.eclipse.jetty%jetty-servlet%8.1.14.v20131031,
  org.eclipse.jetty%jetty-http%8.1.14.v20131031,
  org.eclipse.jetty%jetty-server%8.1.14.v20131031,
  org.eclipse.jetty%jetty-util%8.1.14.v20131031,
  org.eclipse.jetty%jetty-security%8.1.14.v20131031,
  org.eclipse.jetty%jetty-plus%8.1.14.v20131031,
  org.apache.kafka%%kafka%0.8.2.1,
  net.sf.json-lib%json-lib%2.4 from 
http://gradle.artifactoryonline.com/gradle/libs/net/sf/json-lib/json-lib/2.4/json-lib-2.4-jdk15.jar;,
  com.databricks%%spark-csv%1.0.3
)Please give me some suggestion ! 
  

How to restart Twitter spark stream

2015-07-18 Thread Zoran Jeremic
Hi,

I have a twitter spark stream initialized in the following way:

  val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth, filters)


This works fine when I initialy start it. However, at some point I need to
update filters since users might add new hashtags they want to follow. I
tried to stop the running stream and spark streaming context without
stoping spark context, e.g:


stream.stop()
ssc.stop(false)


Afterward, I'm trying to initialize a new Twitter stream like I did
previously. However, I got this exception:

Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




Anybody can explain how to solve this issue?

Thanks,
Zoran


[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?

2015-07-18 Thread Mike Frampton
I wanted to ask a general question about Hadoop/Yarn and Apache Spark 
integration. I know that 
Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise 
network traffic 
by saving replicated blocks within a rack. i.e. 

I wondered whether, when Spark is configured to use Yarn as a cluster manager, 
it is able to 
use this feature to also minimise network traffic to a degree. 

Sorry if this questionn is not quite accurate but I think you can generally see 
what I mean ?