Parquet Hive table become very slow on 1.3?

2015-03-31 Thread Zheng, Xudong
Hi all,

We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we
find that, just a simple COUNT(*) query will much slower (100x) than Spark
1.2.

I find the most time spent on driver to get HDFS blocks. I find large
amount of get below logs printed:

15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
  fileLength=77153436
  underConstruction=false
  
blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.172:50010, 10.152.116.169:50010,
10.153.125.184:50010]}]
  
lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.169:50010, 10.153.125.184:50010,
10.152.116.172:50010]}
  isLastBlockComplete=true}
15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010


I compare the printed log with Spark 1.2, although the number of
getBlockLocations call is similar, but each such operation only cost 20~30
ms (but it is 2000ms~3000ms now), and it didn't print the detailed
LocatedBlocks info.

Another finding is, if I read the Parquet file via scala code form
spark-shell as below, it looks fine, the computation will return the result
quick as before.

sqlContext.parquetFile(data/myparquettable)


Any idea about it? Thank you!


-- 
郑旭东
Zheng, Xudong


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Akhil Das
You can add an internal ip to public hostname mapping in your /etc/hosts
file, if your forwarding is proper then it wouldn't be a problem there
after.



Thanks
Best Regards

On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote:

 Hi,

 For security reasons, we added a server between my aws Spark Cluster and
 local, so I couldn't connect to the cluster directly. To see the SparkUI
 and
 its related work's  stdout and stderr, I used dynamic forwarding and
 configured the SOCKS proxy. Now I could see the SparkUI using the  internal
 ec2 ip, however when I click on the application UI (4040) or the worker's
 UI
 (8081), it still automatically uses the public DNS instead of internal ec2
 ip, which the browser now couldn't show.

 Is there a way that I could configure this? I saw that one could configure
 the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
 help. Does anyone experience the same issue?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
I have update my spark source code to 1.3.1.

the checkpoint works well. 

BUT the shuffle data still could not be delete automatically…the disk usage is 
still 30TB…

I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.

Do you know how to solve my problem?

Sendong Li



 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com 写道:
 
 setCheckpointInterval was added in the current master and branch-1.3. Please 
 help check whether it works. It will be included in the 1.3.1 and 1.4.0 
 release. -Xiangrui
 
 On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 hi, xiangrui:
 I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
 the code is :
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
  
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 PastedGraphic-2.tiff
 
 the checkpoint is very important in my situation, because my task will 
 produce 1TB shuffle data in each iteration, it the shuffle data is not 
 deleted in each iteration(using checkpoint()), the task will produce 30TB 
 data…
 
 
 So I change the ALS code, and re-compile by myself, but it seems the 
 checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
 only add two lines to the ALS.scala) :
 
 PastedGraphic-3.tiff
 
 
 
 and the driver’s log seems strange, why the log is printed together...
 PastedGraphic-1.tiff
 
 thank you very much!
 
 
 在 2015年2月26日,下午11:33,163 lisend...@163.com mailto:lisend...@163.com 写道:
 
 Thank you very much for your opinion:)
 
 In our case, maybe it 's dangerous to treat un-observed item as negative 
 interaction(although we could give them small confidence, I think they are 
 still incredible...)
 
 I will do more experiments and give you feedback:)
 
 Thank you;)
 
 
 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com 写道:
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.
 
 On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com 
 mailto:lisend...@163.com wrote:
 oh my god, I think I understood...
 In my case, there are three kinds of user-item pairs:
 
 Display and click pair(positive pair)
 Display but no-click pair(negative pair)
 No-display pair(unobserved pair)
 
 Explicit ALS only consider the first and the second kinds
 But implicit ALS consider all the three kinds of pair(and consider the 
 third
 kind as the second pair, because their preference value are all zero and
 confidence are all 1)
 
 So the result are different. right?
 
 Could you please give me some advice, which ALS should I use?
 If I use the implicit ALS, how to distinguish the second and the third kind
 of pair:)
 
 My opinion is in my case, I should use explicit ALS ...
 
 Thank you so much
 
 在 2015年2月26日,22:41,Xiangrui Meng m...@databricks.com 
 mailto:m...@databricks.com 写道:
 
 Lisen, did you use all m-by-n pairs during training? Implicit model
 penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
 
 On Feb 26, 2015 6:26 AM, Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com wrote:
 
 +user
 
 On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com wrote:
 
 I think I may have it backwards, and that you are correct to keep the 0
 elements in train() in order to try to reproduce the same result.
 
 The second formulation is called 'weighted regularization' and is used
 for both implicit and explicit feedback, as far as I can see in the code.
 
 Hm, I'm actually not clear why these would produce different results.
 Different code paths are used to be sure, but I'm not yet sure why they
 would give different results.
 
 In general you wouldn't use train() for data like this though, and would
 never set alpha=0.
 
 On Thu, Feb 26, 2015 at 2:15 PM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 
 I want to confirm the loss function you use (sorry I’m not so familiar
 with scala code so I did not understand the source code of mllib)
 
 According to the papers :
 
 
 in your implicit feedback ALS, the loss function is (ICDM 2008):
 
 in the explicit feedback ALS, the loss function is (Netflix 2008):
 
 note that besides the difference of confidence 

Re: JettyUtils.createServletHandler Method not Found?

2015-03-31 Thread kmader
Yes, this private is checked at compile time and my class is in a subpackage
of org.apache.spark.ui, so the visibility is not the issue, or at least not
as far as I can tell.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JettyUtils-createServletHandler-Method-not-Found-tp22262p22313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error in Delete Table

2015-03-31 Thread Ted Yu
Which Spark and Hive release are you using ?

Thanks



 On Mar 27, 2015, at 2:45 AM, Masf masfwo...@gmail.com wrote:
 
 Hi.
 
 In HiveContext, when I put this statement DROP TABLE IF EXISTS TestTable
 If TestTable doesn't exist, spark returns an error:
 
 
 
 ERROR Hive: NoSuchObjectException(message:default.TestTable table not found)
   at 
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338)
   at 
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306)
   at 
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237)
   at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
   at 
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036)
   at 
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022)
   at 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
   at com.sun.proxy.$Proxy22.getTable(Unknown Source)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
   at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942)
   at 
 org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887)
   at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310)
   at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
   at 
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
   at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554)
   at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321)
   at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139)
   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962)
   at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
   at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
   at 
 org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
   at 
 org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58)
   at 
 org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56)
   at 
 org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
   at 
 org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51)
   at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
   at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
   at 
 org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
   at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
   at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
   at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at GeoMain$.HiveExecution(GeoMain.scala:96)
   at GeoMain$.main(GeoMain.scala:17)
   at GeoMain.main(GeoMain.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 
 Thanks!!
 -- 
 
 
 Regards.
 Miguel Ángel

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: log4j.properties in jar

2015-03-31 Thread Emre Sevinc
Hello Udit,

Yes, what you ask is possible. If you follow the Spark documentation and
tutorial about how to build stand-alone applications, you can see that it
is possible to build a stand-alone, über-JAR file that includes everything.

For example, if you want to suppress some messages by modifying log4j in
unit tests, you can do the following:
http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736

Hope this helps.

--
Emre Sevinç
http://www.bigindustries.be/


On Mon, Mar 30, 2015 at 10:24 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,


 Is it possible to put the log4j.properties in the application jar such
 that the driver and the executors use this log4j file. Do I need to specify
 anything while submitting my app so that this file is used?

 Thanks,
 Udit




-- 
Emre Sevinc


Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Shivaram Venkataraman
My guess is that the `createDataFrame` call is failing here.  Can you check
if the schema being passed to it includes the column name and type for the
newly being zipped `features` ?

Joseph probably knows this better, but AFAIK the DenseVector here will need
to be marked as a VectorUDT while creating a DataFrame column

Thanks
Shivaram

On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Following your suggestion, I end up with the following implementation :







 *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
 {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
 val map = this.paramMap ++ paramMap*













 *val features = dataSet.select(map(inputCol)).mapPartitions { rows =
 Caffe.set_mode(Caffe.CPU)val net = 
 CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) 
val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1
 val K: Int = inputBlobs.get(0).channels()val H: Int = 
 inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
 inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new 
 FloatPointer(N*K*W*H)*
 val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

 val feat = rows.map { case Row(a: Iterable[Float])=
   dataBlob.put(a.toArray, 0, a.size)
   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























 *  val resultDim = resultBlobs.get(0).channels()  logInfo(sOutput 
 dimension $resultDim)  val resultBlobData = 
 resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)   
resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))} 
//net.deallocate()feat  }  val newRowData = 
 dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq  
 Row.fromSeq(oldSeq :+ feat)  }  
 dataSet.sqlContext.createDataFrame(newRowData, schema)}*


 The idea is to mapPartitions of the underlying RDD of the DataFrame and
 create a new DataFrame by zipping the results. It seems to work but when I
 try to save the RDD I got the following error :

 org.apache.spark.mllib.linalg.DenseVector cannot be cast to
 org.apache.spark.sql.Row


 On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

 One workaround could be to convert a DataFrame into a RDD inside the
 transform function and then use mapPartitions/broadcast to work with the
 JNI calls and then convert back to RDD.

 Thanks
 Shivaram

 On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Dear all,

 I'm still struggling to make a pre-trained caffe model transformer for
 dataframe works. The main problem is that creating a caffe model inside the
 UDF is very slow and consumes memories.

 Some of you suggest to broadcast the model. The problem with
 broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
  and it is not serializable.

 Another possible approach is to use a UDF that can handle a whole
 partitions instead of just a row in order to minimize the caffe model
 instantiation.

 Is there any ideas to solve one of these two issues ?



 Best,

 Jao

 On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com
 wrote:

 I see.  I think your best bet is to create the cnnModel on the master
 and then serialize it to send to the workers.  If it's big (1M or so), then
 you can broadcast it and use the broadcast variable in the UDF.  There is
 not a great way to do something equivalent to mapPartitions with UDFs right
 now.

 On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Here is my current implementation with current master version of spark




 *class DeepCNNFeature extends Transformer with HasInputCol with
 HasOutputCol ... {   override def transformSchema(...) { ... }*
 *override def transform(dataSet: DataFrame, paramMap: ParamMap):
 DataFrame = {*

 *  transformSchema(dataSet.schema, paramMap, logging = 
 true)*



 *  val map = this.paramMap ++ paramMap  
 val deepCNNFeature = udf((v: Vector)= {*

 *  val cnnModel = new CaffeModel *

 *  cnnModel.transform(v)*




 *  } : Vector )  
 dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*


 * }*
 *}*

 where CaffeModel is a java api to Caffe C++ model.

 The problem here is that for every row it will create a new instance
 of CaffeModel which is inefficient since creating a new model
 means loading a large model file. And it will transform only a single
 row at a time. Or a Caffe network can process a batch of rows efficiently.
 In other words, is it possible to create an UDF that can operatats on a
 partition in order to minimize the creation of a CaffeModel and
 to take 

Re: Error in Delete Table

2015-03-31 Thread Masf
Hi Ted.

Spark 1.2.0 an Hive 0.13.1

Regards.
Miguel Angel.


On Tue, Mar 31, 2015 at 10:37 AM, Ted Yu yuzhih...@gmail.com wrote:

 Which Spark and Hive release are you using ?

 Thanks



  On Mar 27, 2015, at 2:45 AM, Masf masfwo...@gmail.com wrote:
 
  Hi.
 
  In HiveContext, when I put this statement DROP TABLE IF EXISTS
 TestTable
  If TestTable doesn't exist, spark returns an error:
 
 
 
  ERROR Hive: NoSuchObjectException(message:default.TestTable table not
 found)
at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338)
at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306)
at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237)
at
 org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036)
at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022)
at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90)
at com.sun.proxy.$Proxy22.getTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942)
at
 org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887)
at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
at
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
at
 org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
at
 org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at
 org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58)
at
 org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56)
at
 org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
at
 org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51)
at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at
 org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98)
at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at GeoMain$.HiveExecution(GeoMain.scala:96)
at GeoMain$.main(GeoMain.scala:17)
at GeoMain.main(GeoMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 
  Thanks!!
  --
 
 
  Regards.
  Miguel Ángel




-- 


Saludos.
Miguel Ángel


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
Thank you, @GuoQiang
I will try to add runGC() to the ALS.scala, and if it works for deleting the 
shuffle data, I will tell you :-)



 ?? 2015??3??314:47??GuoQiang Li wi...@qq.com ??
 
 You can try to enforce garbage collection:
 
 /** Run GC and make sure it actually has run */
 def runGC() {
   val weakRef = new WeakReference(new Object())
   val startTime = System.currentTimeMillis
   System.gc() // Make a best effort to run the garbage collection. It 
 *usually* runs GC.
   // Wait until a weak reference object has been GCed
   System.runFinalization()
   while (weakRef.get != null) {
 System.gc()
 System.runFinalization()
 Thread.sleep(200)
 if (System.currentTimeMillis - startTime  1) {
   throw new Exception(automatically cleanup error)
 }
   }
 }
 
 
 --  --
 ??: lisendonglisend...@163.com mailto:lisend...@163.com; 
 : 2015??3??31??(??) 3:47
 ??: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; 
 : Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; 
 useruser@spark.apache.org mailto:user@spark.apache.org; Sean 
 Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang 
 Liwi...@qq.com mailto:wi...@qq.com; 
 : Re: different result from implicit ALS with explicit ALS
 
 I have update my spark source code to 1.3.1.
 
 the checkpoint works well. 
 
 BUT the shuffle data still could not be delete automatically??the disk usage 
 is still 30TB??
 
 I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
 
 Do you know how to solve my problem?
 
 Sendong Li
 
 
 
 ?? 2015??3??3112:11??Xiangrui Meng men...@gmail.com 
 mailto:men...@gmail.com ??
 
 setCheckpointInterval was added in the current master and branch-1.3. Please 
 help check whether it works. It will be included in the 1.3.1 and 1.4.0 
 release. -Xiangrui
 
 On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 hi, xiangrui:
 I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
 the code is :
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
  
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 PastedGraphic-2.tiff
 
 the checkpoint is very important in my situation, because my task will 
 produce 1TB shuffle data in each iteration, it the shuffle data is not 
 deleted in each iteration(using checkpoint()), the task will produce 30TB 
 data??
 
 
 So I change the ALS code, and re-compile by myself, but it seems the 
 checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
 only add two lines to the ALS.scala) :
 
 PastedGraphic-3.tiff
 
 
 
 and the driver??s log seems strange, why the log is printed together...
 PastedGraphic-1.tiff
 
 thank you very much!
 
 
 ?? 2015??2??2611:33??163 lisend...@163.com 
 mailto:lisend...@163.com ??
 
 Thank you very much for your opinion:)
 
 In our case, maybe it 's dangerous to treat un-observed item as negative 
 interaction(although we could give them small confidence, I think they are 
 still incredible...)
 
 I will do more experiments and give you feedback:)
 
 Thank you;)
 
 
 ?? 2015??2??2623:16??Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com ??
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.
 
 On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com 
 mailto:lisend...@163.com wrote:
 oh my god, I think I understood...
 In my case, there are three kinds of user-item pairs:
 
 Display and click pair(positive pair)
 Display but no-click pair(negative pair)
 No-display pair(unobserved pair)
 
 Explicit ALS only consider the first and the second kinds
 But implicit ALS consider all the three kinds of pair(and consider the 
 third
 kind as the second pair, because their preference value are all zero and
 confidence are all 1)
 
 So the result are different. right?
 
 Could you please give me some advice, which ALS should I use?
 If I use the implicit ALS, how to distinguish the second and the third 
 kind
 of pair:)
 
 My opinion is in my case, I should use explicit ALS ...
 
 Thank you so much
 
 ?? 2015??2??2622:41??Xiangrui Meng 

workers no route to host

2015-03-31 Thread ZhuGe
Hi,i set up a standalone cluster of 5 machines(tmaster, tslave1,2,3,4) with 
spark-1.3.0-cdh5.4.0-snapshort. when i execute the sbin/start-all.sh, the 
master is ok, but i cant see the web ui. Moreover, the worker logs is something 
like this:
Spark assembly has been built with Hive, including Datanucleus jars on 
classpath/data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: 
hadoop: command not foundSpark Command: java -cp 
:/data/PlatformDep/cdh5/dist/sbin/../conf:/data/PlatformDep/cdh5/dist/lib/spark-assembly-1.3.0-cdh5.4.0-SNAPSHOT-hadoop2.6.0-cdh5.4.0-SNAPSHOT.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-rdbms-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-api-jdo-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-core-3.2.2.jar:
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m 
org.apache.spark.deploy.worker.Worker spark://192.168.128.16:7071 --webui-port 
8081
Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties15/03/31 06:47:22 INFO Worker: 
Registered signal handlers for [TERM, HUP, INT]15/03/31 06:47:23 WARN 
NativeCodeLoader: Unable to load native-hadoop library for your platform... 
using builtin-java classes where applicable15/03/31 06:47:23 INFO 
SecurityManager: Changing view acls to: dcadmin15/03/31 06:47:23 INFO 
SecurityManager: Changing modify acls to: dcadmin15/03/31 06:47:23 INFO 
SecurityManager: SecurityManager: authentication disabled; ui acls disabled; 
users with view permissions: Set(dcadmin); users with modify permissions: 
Set(dcadmin)15/03/31 06:47:23 INFO Slf4jLogger: Slf4jLogger started15/03/31 
06:47:23 INFO Remoting: Starting remoting15/03/31 06:47:23 INFO Remoting: 
Remoting started; listening on addresses 
:[akka.tcp://sparkWorker@tslave2:60815]15/03/31 06:47:24 INFO Utils: 
Successfully started service 'sparkWorker' on port 60815.15/03/31 06:47:24 INFO 
Worker: Starting Spark worker tslave2:60815 with 2 cores, 3.0 GB RAM15/03/31 
06:47:24 INFO Worker: Running Spark version 1.3.015/03/31 06:47:24 INFO Worker: 
Spark home: /data/PlatformDep/cdh5/dist15/03/31 06:47:24 INFO Server: 
jetty-8.y.z-SNAPSHOT15/03/31 06:47:24 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:808115/03/31 06:47:24 INFO Utils: Successfully 
started service 'WorkerUI' on port 8081.15/03/31 06:47:24 INFO WorkerWebUI: 
Started WorkerWebUI at http://tslave2:808115/03/31 06:47:24 INFO Worker: 
Connecting to master 
akka.tcp://sparkMaster@192.168.128.16:7071/user/Master...15/03/31 06:47:24 
ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] 
- [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] - 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] - 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No 
route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@tslave2:60815] - 
[akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]] 
[akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@192.168.128.16:7071]


the worker machines ping the master machine successfully. the hosts is like 
this:192.168.128.16 tmaster tmaster192.168.128.17 tslave1 tslave1192.168.128.18 
tslave2 tslave2192.168.128.19 tslave3 tslave3192.168.128.20 tslave4 tslave4
Hope someone could help. Thanks   

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
Following your suggestion, I end up with the following implementation :







*override def transform(dataSet: DataFrame, paramMap: ParamMap):
DataFrame = {  val schema = transformSchema(dataSet.schema, paramMap,
logging = true)  val map = this.paramMap ++ paramMap*













*val features = dataSet.select(map(inputCol)).mapPartitions { rows =
  Caffe.set_mode(Caffe.CPU)val net =
CaffeUtils.floatTestNetwork(SparkFiles.get(topology),
SparkFiles.get(weight))val inputBlobs: FloatBlobVector =
net.input_blobs()val N: Int = 1val K: Int =
inputBlobs.get(0).channels()val H: Int =
inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width()
 inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new
FloatPointer(N*K*W*H)*
val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

val feat = rows.map { case Row(a: Iterable[Float])=
  dataBlob.put(a.toArray, 0, a.size)
  caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
  val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























*  val resultDim = resultBlobs.get(0).channels()
logInfo(sOutput dimension $resultDim)  val resultBlobData =
resultBlobs.get(0).cpu_data()  val output = new
Array[Float](resultDim)  resultBlobData.get(output)
Vectors.dense(output.map(_.toDouble))}//net.deallocate()
feat  }  val newRowData = dataSet.rdd.zip(features).map { case (old,
feat)=val oldSeq = old.toSeq  Row.fromSeq(oldSeq :+ feat)  }
dataSet.sqlContext.createDataFrame(newRowData, schema)}*


The idea is to mapPartitions of the underlying RDD of the DataFrame and
create a new DataFrame by zipping the results. It seems to work but when I
try to save the RDD I got the following error :

org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row


On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 One workaround could be to convert a DataFrame into a RDD inside the
 transform function and then use mapPartitions/broadcast to work with the
 JNI calls and then convert back to RDD.

 Thanks
 Shivaram

 On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Dear all,

 I'm still struggling to make a pre-trained caffe model transformer for
 dataframe works. The main problem is that creating a caffe model inside the
 UDF is very slow and consumes memories.

 Some of you suggest to broadcast the model. The problem with broadcasting
 is that I use a JNI interface to caffe C++ with javacpp-preset  and it is
 not serializable.

 Another possible approach is to use a UDF that can handle a whole
 partitions instead of just a row in order to minimize the caffe model
 instantiation.

 Is there any ideas to solve one of these two issues ?



 Best,

 Jao

 On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com
 wrote:

 I see.  I think your best bet is to create the cnnModel on the master
 and then serialize it to send to the workers.  If it's big (1M or so), then
 you can broadcast it and use the broadcast variable in the UDF.  There is
 not a great way to do something equivalent to mapPartitions with UDFs right
 now.

 On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Here is my current implementation with current master version of spark




 *class DeepCNNFeature extends Transformer with HasInputCol with
 HasOutputCol ... {   override def transformSchema(...) { ... }*
 *override def transform(dataSet: DataFrame, paramMap: ParamMap):
 DataFrame = {*

 *  transformSchema(dataSet.schema, paramMap, logging = 
 true)*



 *  val map = this.paramMap ++ paramMap  
 val deepCNNFeature = udf((v: Vector)= {*

 *  val cnnModel = new CaffeModel *

 *  cnnModel.transform(v)*




 *  } : Vector )  
 dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol*


 * }*
 *}*

 where CaffeModel is a java api to Caffe C++ model.

 The problem here is that for every row it will create a new instance of
 CaffeModel which is inefficient since creating a new model
 means loading a large model file. And it will transform only a single
 row at a time. Or a Caffe network can process a batch of rows efficiently.
 In other words, is it possible to create an UDF that can operatats on a
 partition in order to minimize the creation of a CaffeModel and
 to take advantage of the Caffe network batch processing ?



 On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com
 wrote:

 I see, thanks for clarifying!

 I'd recommend following existing implementations in spark.ml
 transformers.  You'll need to define a UDF which operates on a single Row
 to compute the value for the new column.  You can then use the DataFrame
 DSL to create the new column; the DSL provides a nice syntax for what 
 would
 otherwise be a SQL statement like 

Re: Some questions after playing a little with the new ml.Pipeline.

2015-03-31 Thread Jaonary Rabarisoa
In my transformSchema I do specify that the output column type is a VectorUDT :






*override def transformSchema(schema: StructType, paramMap: ParamMap):
StructType = {  val map = this.paramMap ++ paramMap
checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false))
addOutputColumn(schema, map(outputCol), new VectorUDT)}*


The output of printSchema is as follow :

*|-- cnnFeature: vecto (nullable = false)*



On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 My guess is that the `createDataFrame` call is failing here.  Can you
 check if the schema being passed to it includes the column name and type
 for the newly being zipped `features` ?

 Joseph probably knows this better, but AFAIK the DenseVector here will
 need to be marked as a VectorUDT while creating a DataFrame column

 Thanks
 Shivaram

 On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Following your suggestion, I end up with the following implementation :







 *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = 
 {  val schema = transformSchema(dataSet.schema, paramMap, logging = true)  
 val map = this.paramMap ++ paramMap*













 *val features = dataSet.select(map(inputCol)).mapPartitions { rows =
 Caffe.set_mode(Caffe.CPU)val net = 
 CaffeUtils.floatTestNetwork(SparkFiles.get(topology), 
 SparkFiles.get(weight))val inputBlobs: FloatBlobVector = 
 net.input_blobs()val N: Int = 1val K: Int = 
 inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()
 val W: Int = inputBlobs.get(0).width()inputBlobs.get(0).Reshape(N, K, H, 
 W)val dataBlob = new FloatPointer(N*K*W*H)*
 val inputCPUData = inputBlobs.get(0).mutable_cpu_data()

 val feat = rows.map { case Row(a: Iterable[Float])=
   dataBlob.put(a.toArray, 0, a.size)
   caffe_copy_float(N*K*W*H, dataBlob, inputCPUData)
   val resultBlobs: FloatBlobVector = net.ForwardPrefilled()























 *  val resultDim = resultBlobs.get(0).channels()  logInfo(sOutput 
 dimension $resultDim)  val resultBlobData = 
 resultBlobs.get(0).cpu_data()  val output = new Array[Float](resultDim)  
 resultBlobData.get(output)  Vectors.dense(output.map(_.toDouble))
 }//net.deallocate()feat  }  val newRowData = 
 dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq 
  Row.fromSeq(oldSeq :+ feat)  }  
 dataSet.sqlContext.createDataFrame(newRowData, schema)}*


 The idea is to mapPartitions of the underlying RDD of the DataFrame and
 create a new DataFrame by zipping the results. It seems to work but when I
 try to save the RDD I got the following error :

 org.apache.spark.mllib.linalg.DenseVector cannot be cast to
 org.apache.spark.sql.Row


 On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

 One workaround could be to convert a DataFrame into a RDD inside the
 transform function and then use mapPartitions/broadcast to work with the
 JNI calls and then convert back to RDD.

 Thanks
 Shivaram

 On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Dear all,

 I'm still struggling to make a pre-trained caffe model transformer for
 dataframe works. The main problem is that creating a caffe model inside the
 UDF is very slow and consumes memories.

 Some of you suggest to broadcast the model. The problem with
 broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset
  and it is not serializable.

 Another possible approach is to use a UDF that can handle a whole
 partitions instead of just a row in order to minimize the caffe model
 instantiation.

 Is there any ideas to solve one of these two issues ?



 Best,

 Jao

 On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com
 wrote:

 I see.  I think your best bet is to create the cnnModel on the master
 and then serialize it to send to the workers.  If it's big (1M or so), 
 then
 you can broadcast it and use the broadcast variable in the UDF.  There is
 not a great way to do something equivalent to mapPartitions with UDFs 
 right
 now.

 On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Here is my current implementation with current master version of
 spark




 *class DeepCNNFeature extends Transformer with HasInputCol with
 HasOutputCol ... {   override def transformSchema(...) { ... }*
 *override def transform(dataSet: DataFrame, paramMap: ParamMap):
 DataFrame = {*

 *  transformSchema(dataSet.schema, paramMap, logging = 
 true)*



 *  val map = this.paramMap ++ paramMap  
 val deepCNNFeature = udf((v: Vector)= {*

 *  val cnnModel = new CaffeModel *

 *  cnnModel.transform(v)*




 *  } : Vector )  
 dataSet.withColumn(map(outputCol), 

Unable to save dataframe with UDT created with sqlContext.createDataFrame

2015-03-31 Thread Jaonary Rabarisoa
Hi all,

DataFrame with an user defined type (here mllib.Vector) created with
sqlContex.createDataFrame can't be saved to parquet file and raise
ClassCastException:
org.apache.spark.mllib.linalg.DenseVector cannot be cast to
org.apache.spark.sql.Row error.

Here is an example of code to reproduce this error :






















*object TestDataFrame {  def main(args: Array[String]): Unit = {
//System.loadLibrary(Core.NATIVE_LIBRARY_NAME)val conf = new
SparkConf().setAppName(RankingEval).setMaster(local[8])
.set(spark.executor.memory, 6g)val sc = new SparkContext(conf)
   val sqlContext = new SQLContext(sc)import
sqlContext.implicits._val data =
sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10val dataDF
= data.toDFdataDF.save(test1.parquet)val dataDF2 =
sqlContext.createDataFrame(dataDF.rdd, dataDF.schema)
dataDF2.save(test2.parquet)  }}*


Is this related to https://issues.apache.org/jira/browse/SPARK-5532
and how can it be solved ?


Cheers,


Jao


Re: spark there is no space on the disk

2015-03-31 Thread Peng Xia
Yes, we have just modified the configuration, and every thing works fine.
Thanks very much for the help.

On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote:

 For YARN, possibly this one ?

 property
   nameyarn.nodemanager.local-dirs/name
   value/hadoop/yarn/local/value
 /property

 Cheers

 On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 IIRC you have to set that configuration on the Worker processes (for
 standalone). The app can't override it (only for a client-mode
 driver). YARN has a similar configuration, but I don't know the name
 (shouldn't be hard to find, though).

 On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com
 wrote:
  Is it possible that `spark.local.dir` is overriden by others? The docs
 say:
 
  NOTE: In Spark 1.0 and later this will be overriden by
  SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)
 
  On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com
 wrote:
  Hi Sean,
 
  Thank very much for your reply.
  I tried to config it from below code:
 
  sf = SparkConf().setAppName(test).set(spark.executor.memory,
  45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)
 
  But still get the error.
  Do you know how I can config this?
 
 
  Thanks,
  Best,
  Peng
 
 
  On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:
 
  It means pretty much what it says. You ran out of space on an executor
  (not driver), because the dir used for serialization temp files is
  full (not all volumes). Set spark.local.dirs to something more
  appropriate and larger.
 
  On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com
 wrote:
   Hi
  
  
   I was running a logistic regression algorithm on a 8 nodes spark
   cluster,
   each node has 8 cores and 56 GB Ram (each node is running a windows
   system).
   And the spark installation driver has 1.9 TB capacity. The dataset
 I was
   training on are has around 40 million records with around 6600
 features.
   But
   I always get this error during the training process:
  
   Py4JJavaError: An error occurred while calling
   o70.trainLogisticRegressionModelWithLBFGS.
   : org.apache.spark.SparkException: Job aborted due to stage failure:
   Task
   2709 in stage 3.0 failed 4 times, most recent failure: Lost task
 2709.3
   in
   stage 3.0 (TID 2766,
   workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
   java.io.IOException: There is not enough space on the disk
   at java.io.FileOutputStream.writeBytes(Native Method)
   at java.io.FileOutputStream.write(FileOutputStream.java:345)
   at
   java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
   at
  
 org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
   at
  
  
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
   at
   java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
   at
  
  
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at
  
  
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
   at
  
  
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
   at
   org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
   at
   org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
   at
  
  
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
   at
  
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
   at
   org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
   at
   org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at
   org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
   at
   org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at
  
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
  
   Driver stacktrace:
   at
  
   

Re: Can't run spark-submit with an application jar on a Mesos cluster

2015-03-31 Thread hbogert
Well that are only the logs of the slaves on mesos level,  I'm not sure from
your reply if you can ssh into a specific slave or not, if you can, you
should  look at actual output of the application (spark in this case) on a
slave in e.g.
 
/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/4/runs/e3cf195d-525b-4148-aa38-1789d378a948/std{err,out}

actual UUIDs, run number (in this example '4') in the path can differ from
slave-node to slave-node.

look into those stderr and stdout files and you'll probably have your answer
why it is failing.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22319.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Nicolas Phung
Hello,

@Akhil Das I'm trying to use the experimental API
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scalasa=Dsntz=1usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw.
I'm reusing the same code snippet to initialize my topicSet.

@Cody Koeninger I don't see any previous error messages (see the full log
at the end). To create the topic, I'm doing the following :

kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 10 --topic toto

kafka-topics --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic toto-single

I'm launching my Spark Streaming in local mode.

@Ted Yu There's no log Couldn't connect to leader for topic, here's the
full version :

spark-submit --conf config.resource=application-integration.conf --class
nextgen.Main assembly-0.1-SNAPSHOT.jar

15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
15/03/31 10:47:12 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(nphung); users with modify permissions: Set(nphung)
15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
15/03/31 10:47:13 INFO Remoting: Starting remoting
15/03/31 10:47:13 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@int.local:44180]
15/03/31 10:47:13 INFO Utils: Successfully started service
'sparkDriver' on port 44180.
15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20150331104713-2238
15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file
server' on port 50204.
15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
15/03/31 10:47:16 INFO SparkContext: Added JAR
file:/home/nphung/assembly-0.1-SNAPSHOT.jar at
http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with
timestamp 1427791636151
15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block
manager localhost:40630 with 265.1 MB RAM, BlockManagerId(driver,
localhost, 40630)
15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
15/03/31 10:47:17 INFO EventLoggingListener: Logging events to
hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to
15/03/31 10:47:17 INFO VerifiableProperties: Property
zookeeper.connect is overridden to
15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and
validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@7fd8c559
15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
15/03/31 10:47:17 INFO MappedDStream: Storage level =
StorageLevel(false, false, false, false, 1)
15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval 

Broadcasting a parquet file using spark and python

2015-03-31 Thread jitesh129
How can we implement a BroadcastHashJoin for spark with python?

My SparkSQL inner joins are taking a lot of time since it is performing
ShuffledHashJoin.

Tables on which join is performed are stored as parquet files.

Please help.

Thanks and regards,
Jitesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: refer to dictionary

2015-03-31 Thread Ted Yu
You can use broadcast variable. 

See also this thread:
http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+



 On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote:
 
 Hi,
 
 I have a RDD (rdd1)where each line is split into an array [a, b, c], etc.
 And I also have a local dictionary p (dict1) stores key value pair {a:1, 
 b: 2, c:3}
 I want to replace the keys in the rdd with the its corresponding value in the 
 dict:
 rdd1.map(lambda line: [dict1[item] for item in line])
 
 But this task is not distributed, I believe the reason is the dict1 is a 
 local instance.
 Can any one provide suggestions on this to parallelize this?
 
 
 Thanks,
 Best,
 Peng
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Sean Owen
I had always understood the formulation to be the first option you
describe. Lambda is scaled by the number of items the user has rated /
interacted with. I think the goal is to avoid fitting the tastes of
prolific users disproportionately just because they have many ratings
to fit. This is what's described in the ALS-WR paper we link to on the
Spark web site, in equation 5
(http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)

I think this also gets you the scale-invariance? For every additional
rating from user i to product j, you add one new term to the
squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
both increasing about linearly as ratings increase. If the
regularization term is multiplied by the total number of users and
products in the model, then it's fixed.

I might misunderstand you and/or be speaking about something slightly
different when it comes to invariance. But FWIW I had always
understood the regularization to be multiplied by the number of
explicit ratings.

On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote:
 Okay, I didn't realize that I changed the behavior of lambda in 1.3.
 to make it scale-invariant, but it is worth discussing whether this
 is a good change. In 1.2, we multiply lambda by the number ratings in
 each sub-problem. This makes it scale-invariant for explicit
 feedback. However, in implicit feedback model, a user's sub-problem
 contains all item factors. Then the question is whether we should
 multiply lambda by the number of explicit ratings from this user or by
 the total number of items. We used the former in 1.2 but changed to
 the latter in 1.3. So you should try a smaller lambda to get a similar
 result in 1.3.

 Sean and Shuo, which approach do you prefer? Do you know any existing
 work discussing this?

 Best,
 Xiangrui


 On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote:
 This sounds like a bug ... Did you try a different lambda? It would be
 great if you can share your dataset or re-produce this issue on the
 public dataset. Thanks! -Xiangrui

 On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote:
 After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
 smaller factors (and hence scores). For example, the first few product's
 factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
 first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
 difference of several orders of magnitude is consistent throughout both user
 and product. The recommendations from 1.2.0 are subjectively much better
 than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less
 memory.

 My first thought is that there is too much regularization in the 1.3.0
 results, but I'm using the same lambda parameter value. This is a snippet of
 my scala code:
 .
 val rank = 75
 val numIterations = 15
 val alpha = 10
 val lambda = 0.01
 val model = ALS.trainImplicit(train_data, rank, numIterations,
 lambda=lambda, alpha=alpha)
 .

 The code and input data are identical across both versions. Did anything
 change between the two versions I'm not aware of? I'd appreciate any help!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread java8964
You can use the HiveContext instead of SQLContext, which should support all the 
HiveQL, including lateral view explode.
SQLContext is not supporting that yet.
BTW, nice coding format in the email.
Yong

Date: Tue, 31 Mar 2015 18:18:19 -0400
Subject: Re: SparkSql - java.util.NoSuchElementException: key not found: node 
when access JSON Array
From: tsind...@gmail.com
To: user@spark.apache.org

So in looking at this a bit more, I gather the root cause is the fact that the 
nested fields are represented as rows within rows, is that correct?  If I don't 
know the size of the json array (it varies), using x.getAs[Row](0).getString(0) 
is not really a valid solution.  
Is the solution to apply a lateral view + explode to this? 
I have attempted to change to a lateral view, but looks like my syntax is off:








sqlContext.sql(
SELECT path,`timestamp`, name, value, pe.value FROM metric 
 lateral view explode(pathElements) a AS pe)
.collect.foreach(println(_))
Which results in:
15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread main java.lang.RuntimeException: [1.68] failure: 
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view 
explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Is this the 
right approach?  Is this syntax available in 1.2.1:
SELECT
  v1.name, v2.city, v2.state 
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;
-Todd
On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist tsind...@gmail.com wrote:
I am accessing ElasticSearch via the 

Re: Can't run spark-submit with an application jar on a Mesos cluster

2015-03-31 Thread seglo
Thanks hbogert.  There it is plain as day; it can't find my spark binaries. 
I thought it was enough to set SPARK_EXECUTOR_URI in my spark-env.sh since
this is all that's necessary to run spark-shell.sh against a mesos master,
but I also had to set spark.executor.uri in my spark-defaults.conf (or in my
app itself).  Thanks again for your help to troubleshoot this problem.

jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$
cat stderr
I0329 20:34:26.107267 10026 exec.cpp:132] Version: 0.21.1
I0329 20:34:26.109591 10031 exec.cpp:206] Executor registered on slave
20150322-040336-606645514-5050-2744-S1
sh: 1: /home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class: not found
jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$
cat stdout
Registered executor on 10.217.7.180
Starting task 1
Forked command at 10036
sh -c ' /home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@development-5159-d9.c.learning-spark.internal:54746/user/CoarseGrainedScheduler
--executor-id 20150322-040336-606645514-5050-2744-S1 --hostname 10.217.7.180
--cores 10 --app-id 20150322-040336-606645514-5050-2744-0037'
Command exited with status 127 (pid: 10036)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL saveAsParquet failed after a few waves

2015-03-31 Thread Yijie Shen
Hi,

I am using spark-1.3 prebuilt release with hadoop2.4 support and Hadoop 2.4.0.

I wrote a spark application(LoadApp) to generate data in each task and load the 
data into HDFS as parquet Files (use “saveAsParquet()” in spark sql)

When few waves (1 or 2) are used in a job, LoadApp could finish after a few 
failures and retries.
But when more waves (3) are involved in a job, the job would terminate 
abnormally.

All the failures I faced with is:
“java.io.IOException: The file being written is in an invalid state. Probably 
caused by an error thrown previously. Current state: COLUMN

and the stacktraces  are:

java.io.IOException: The file being written is in an invalid state. Probably 
caused by an error thrown previously. Current state: COLUMN
at 
parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at 
parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at 
parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at 
parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at 
parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:634)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


I have no idea what happened since jobs may fail or success without any reason.

Thanks.


Yijie Shen

Re: Actor not found

2015-03-31 Thread Shixiong Zhu
Thanks for the log. It's really helpful. I created a JIRA to explain why it
will happen: https://issues.apache.org/jira/browse/SPARK-6640

However, will this error always happens in your environment?

Best Regards,
Shixiong Zhu

2015-03-31 22:36 GMT+08:00 sparkdi shopaddr1...@dubna.us:

 This is the whole output from the shell:

 ~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root
 15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root
 15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view pe
 rmissions: Set(root); users with modify permissions: Set(root)
 15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server
 15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:40 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:47797
 15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class
 server' on port 47797.
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.0
   /_/

 Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
 Type in expressions to have them evaluated.
 Type :help for more information.
 15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0
 15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root
 15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root
 15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view pe
 rmissions: Set(root); users with modify permissions: Set(root)
 15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started
 15/03/30 19:00:42 INFO Remoting: Starting remoting
 15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@vm:52574]
 15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on
 port 52574.
 15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker
 15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster
 15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at
 /tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl
 ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5
 15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4
 MB
 15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h
 ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea
 15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server
 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:43 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:48500
 15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file
 server' on port 48500.
 15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator
 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:43 INFO AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on
 port
 4040.
 15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040
 15/03/30 19:00:43 INFO Executor: Starting executor ID driver on host
 localhost
 15/03/30 19:00:43 INFO Executor: Using REPL class URI:
 http://10.11.204.80:47797
 15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://sparkDriver@vm:5
 2574/user/HeartbeatReceiver
 15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for:
 ActorSelection[Anchor(akka://sparkDriver/deadLetters),
 Path(/)]
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: 

Anatomy of RDD : Deep dive into RDD data structure

2015-03-31 Thread madhu phatak
Hi,
 Recently I gave a talk on RDD data structure which gives in depth
understanding of spark internals. You can watch it on youtube
https://www.youtube.com/watch?v=WVdyuVwWcBc. Also slides are on slideshare
http://www.slideshare.net/datamantra/anatomy-of-rdd and code is on github
https://github.com/phatak-dev/anatomy-of-rdd.



Regards,
Madhukara Phatak
http://datamantra.io/


Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
In my experiment, if I do not call gc() explicitly, the shuffle files will not 
be cleaned until the whole job finish… I don’t know why, maybe the rdd could 
not be GCed implicitly.
In my situation, a full gc in driver takes about 10 seconds, so I start a 
thread in driver to do GC  like this : (do GC every 120 seconds)

while (true) {
System.gc();
Thread.sleep(120 * 1000);
}


it works well now.
Do you have more elegant ways to clean the shuffle files?

Best Regards,
Sendong Li



 在 2015年4月1日,上午5:09,Xiangrui Meng men...@gmail.com 写道:
 
 Hey Guoqiang and Sendong,
 
 Could you comment on the overhead of calling gc() explicitly? The shuffle 
 files should get cleaned in a few seconds after checkpointing, but it is 
 certainly possible to accumulates TBs of files in a few seconds. In this 
 case, calling gc() may work the same as waiting for a few seconds after each 
 checkpoint. Is it correct?
 
 Best,
 Xiangrui
 
 On Tue, Mar 31, 2015 at 8:58 AM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 guoqiang ’s method works very well …
 
 it only takes 1TB disk now.
 
 thank you very much!
 
 
 
 在 2015年3月31日,下午4:47,GuoQiang Li wi...@qq.com mailto:wi...@qq.com 写道:
 
 You can try to enforce garbage collection:
 
 /** Run GC and make sure it actually has run */
 def runGC() {
   val weakRef = new WeakReference(new Object())
   val startTime = System.currentTimeMillis
   System.gc() // Make a best effort to run the garbage collection. It 
 *usually* runs GC.
   // Wait until a weak reference object has been GCed
   System.runFinalization()
   while (weakRef.get != null) {
 System.gc()
 System.runFinalization()
 Thread.sleep(200)
 if (System.currentTimeMillis - startTime  1) {
   throw new Exception(automatically cleanup error)
 }
   }
 }
 
 
 -- 原始邮件 --
 发件人: lisendonglisend...@163.com mailto:lisend...@163.com; 
 发送时间: 2015年3月31日(星期二) 下午3:47
 收件人: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; 
 抄送: Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; 
 useruser@spark.apache.org mailto:user@spark.apache.org; Sean 
 Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang 
 Liwi...@qq.com mailto:wi...@qq.com; 
 主题: Re: different result from implicit ALS with explicit ALS
 
 I have update my spark source code to 1.3.1.
 
 the checkpoint works well. 
 
 BUT the shuffle data still could not be delete automatically…the disk usage 
 is still 30TB…
 
 I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
 
 Do you know how to solve my problem?
 
 Sendong Li
 
 
 
 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com 
 mailto:men...@gmail.com 写道:
 
 setCheckpointInterval was added in the current master and branch-1.3. 
 Please help check whether it works. It will be included in the 1.3.1 and 
 1.4.0 release. -Xiangrui
 
 On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 hi, xiangrui:
 I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
 the code is :
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
  
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 PastedGraphic-2.tiff
 
 the checkpoint is very important in my situation, because my task will 
 produce 1TB shuffle data in each iteration, it the shuffle data is not 
 deleted in each iteration(using checkpoint()), the task will produce 30TB 
 data…
 
 
 So I change the ALS code, and re-compile by myself, but it seems the 
 checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
 only add two lines to the ALS.scala) :
 
 PastedGraphic-3.tiff
 
 
 
 and the driver’s log seems strange, why the log is printed together...
 PastedGraphic-1.tiff
 
 thank you very much!
 
 
 在 2015年2月26日,下午11:33,163 lisend...@163.com mailto:lisend...@163.com 写道:
 
 Thank you very much for your opinion:)
 
 In our case, maybe it 's dangerous to treat un-observed item as negative 
 interaction(although we could give them small confidence, I think they are 
 still incredible...)
 
 I will do more experiments and give you feedback:)
 
 Thank you;)
 
 
 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com 写道:
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. 

Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Xiangrui Meng
I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have
a clear answer about how the scaling should be handled. Maybe the best
solution for now is to switch back to the 1.2 scaling. -Xiangrui

On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen so...@cloudera.com wrote:
 Ah yeah I take your point. The squared error term is over the whole
 user-item matrix, technically, in the implicit case. I suppose I am
 used to assuming that the 0 terms in this matrix are weighted so much
 less (because alpha is usually large-ish) that they're almost not
 there, but they are. So I had just used the explicit formulation.

 I suppose the result is kind of scale invariant, but not exactly. I
 had not prioritized this property since I had generally built models
 on the full data set and not a sample, and had assumed that lambda
 would need to be retuned over time as the input grew anyway.

 So, basically I don't know anything more than you do, sorry!

 On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng men...@gmail.com wrote:
 Hey Sean,

 That is true for explicit model, but not for implicit. The ALS-WR
 paper doesn't cover the implicit model. In implicit formulation, a
 sub-problem (for v_j) is:

 min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2

 This is a sum for all i but not just the users who rate item j. In
 this case, if we set X=m_j, the number of observed ratings for item j,
 it is not really scale invariant. We have #users user vectors in the
 least squares problem but only penalize lambda * #ratings. I was
 suggesting using lambda * m directly for implicit model to match the
 number of vectors in the least squares problem. Well, this is my
 theory. I don't find any public work about it.

 Best,
 Xiangrui

 On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com wrote:
 I had always understood the formulation to be the first option you
 describe. Lambda is scaled by the number of items the user has rated /
 interacted with. I think the goal is to avoid fitting the tastes of
 prolific users disproportionately just because they have many ratings
 to fit. This is what's described in the ALS-WR paper we link to on the
 Spark web site, in equation 5
 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)

 I think this also gets you the scale-invariance? For every additional
 rating from user i to product j, you add one new term to the
 squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
 regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
 both increasing about linearly as ratings increase. If the
 regularization term is multiplied by the total number of users and
 products in the model, then it's fixed.

 I might misunderstand you and/or be speaking about something slightly
 different when it comes to invariance. But FWIW I had always
 understood the regularization to be multiplied by the number of
 explicit ratings.

 On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote:
 Okay, I didn't realize that I changed the behavior of lambda in 1.3.
 to make it scale-invariant, but it is worth discussing whether this
 is a good change. In 1.2, we multiply lambda by the number ratings in
 each sub-problem. This makes it scale-invariant for explicit
 feedback. However, in implicit feedback model, a user's sub-problem
 contains all item factors. Then the question is whether we should
 multiply lambda by the number of explicit ratings from this user or by
 the total number of items. We used the former in 1.2 but changed to
 the latter in 1.3. So you should try a smaller lambda to get a similar
 result in 1.3.

 Sean and Shuo, which approach do you prefer? Do you know any existing
 work discussing this?

 Best,
 Xiangrui


 On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote:
 This sounds like a bug ... Did you try a different lambda? It would be
 great if you can share your dataset or re-produce this issue on the
 public dataset. Thanks! -Xiangrui

 On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote:
 After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
 smaller factors (and hence scores). For example, the first few product's
 factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
 first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
 difference of several orders of magnitude is consistent throughout both 
 user
 and product. The recommendations from 1.2.0 are subjectively much better
 than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses 
 less
 memory.

 My first thought is that there is too much regularization in the 1.3.0
 results, but I'm using the same lambda parameter value. This is a 
 snippet of
 my scala code:
 .
 val rank = 75
 val numIterations = 15
 val alpha = 10
 val lambda = 0.01
 val model = ALS.trainImplicit(train_data, rank, numIterations,
 

Re: Broadcasting a parquet file using spark and python

2015-03-31 Thread Jitesh chandra Mishra
Hi Michael,

Thanks for your response. I am running 1.2.1.

Is there any workaround to achieve the same with 1.2.1?

Thanks,
Jitesh

On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust mich...@databricks.com
wrote:

 In Spark 1.3 I would expect this to happen automatically when the parquet
 table is small ( 10mb, configurable with 
 spark.sql.autoBroadcastJoinThreshold).
 If you are running 1.3 and not seeing this, can you show the code you are
 using to create the table?

 On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 jitesh...@gmail.com wrote:

 How can we implement a BroadcastHashJoin for spark with python?

 My SparkSQL inner joins are taking a lot of time since it is performing
 ShuffledHashJoin.

 Tables on which join is performed are stored as parquet files.

 Please help.

 Thanks and regards,
 Jitesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark.sql.Row manipulation

2015-03-31 Thread Michael Armbrust
You can do something like:

df.collect().map {
  case Row(name: String, age1: Int, age2: Int) = ...
}

On Tue, Mar 31, 2015 at 4:05 PM, roni roni.epi...@gmail.com wrote:

 I have 2 paraquet files with format e.g  name , age, town
 I read them  and then join them to get  all the names which are in both
 towns  .
 the resultant dataset is

 res4: Array[org.apache.spark.sql.Row] = Array([name1, age1,
 town1,name2,age2,town2])

 Name 1 and name 2 are same as I am joining .
 Now , I want to get only to the format (name , age1, age2)

 But I cant seem to getting to manipulate the spark.sql.row.

 Trying something like map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt)) does not work .

 Can you suggest a way ?

 Thanks
 -R




Minimum slots assigment to Spark on Mesos

2015-03-31 Thread Stratos Dimopoulos
Hi All,

I am running Spark  MR on Mesos. Is there a configuration setting for
Spark to define the minimum required slots (similar to MapReduce's
mapred.mesos.total.reduce.slots.minimum and mapred.mesos.total.map.slots.
minimum)? The most related property I see is this: spark.scheduler.
minRegisteredResourcesRatio found on the documentation here:
http://spark.apache.org/docs/1.2.1/configuration.html#spark-properties
What I wanted to have is a fixed amount of trackers assigned for Spark so I
can share my cluster with MR. Any suggestion on parts of code or
documentation that I should check for a full list of available
configurations?

thanks,
Stratos


Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-31 Thread Ted Yu
Can you show us the output of DStream#print() if you have it ?

Thanks

On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung nicolas.ph...@gmail.com
wrote:

 Hello,

 @Akhil Das I'm trying to use the experimental API
 https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scalasa=Dsntz=1usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw.
 I'm reusing the same code snippet to initialize my topicSet.

 @Cody Koeninger I don't see any previous error messages (see the full log
 at the end). To create the topic, I'm doing the following :

 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
 --partitions 10 --topic toto

 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
 --partitions 1 --topic toto-single

 I'm launching my Spark Streaming in local mode.

 @Ted Yu There's no log Couldn't connect to leader for topic, here's the
 full version :

 spark-submit --conf config.resource=application-integration.conf --class
 nextgen.Main assembly-0.1-SNAPSHOT.jar

 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(nphung); users 
 with modify permissions: Set(nphung)
 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
 15/03/31 10:47:13 INFO Remoting: Starting remoting
 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkDriver@int.local:44180]
 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkDriver@int.local:44180]
 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on 
 port 44180.
 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at 
 /tmp/spark-local-20150331104713-2238
 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is 
 /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' 
 on port 50204.
 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 
 4040.
 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
 15/03/31 10:47:16 INFO SparkContext: Added JAR 
 file:/home/nphung/assembly-0.1-SNAPSHOT.jar at 
 http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 
 1427791636151
 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
 akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager 
 localhost:40630 with 265.1 MB RAM, BlockManagerId(driver, localhost, 40630)
 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to 
 hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden 
 to
 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is 
 overridden to
 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = 
 StorageLevel(false, false, false, false, 1)
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
 false, false, false, 1)
 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
 

Re: Anyone has some simple example with spark-sql with spark 1.3

2015-03-31 Thread Vincent He
It works,thanks for your great help.

On Mon, Mar 30, 2015 at 10:07 PM, Denny Lee denny.g@gmail.com wrote:

 Hi Vincent,

 This may be a case that you're missing a semi-colon after your CREATE
 TEMPORARY TABLE statement.  I ran your original statement (missing the
 semi-colon) and got the same error as you did.  As soon as I added it in, I
 was good to go again:

 CREATE TEMPORARY TABLE jsonTable
 USING org.apache.spark.sql.json
 OPTIONS (
   path /samples/people.json
 );
 -- above needed a semi-colon so the temporary table could be created first
 SELECT * FROM jsonTable;

 HTH!
 Denny


 On Sun, Mar 29, 2015 at 6:59 AM Vincent He vincent.he.andr...@gmail.com
 wrote:

 No luck, it does not work, anyone know whether there some special setting
 for spark-sql cli so we do not need to write code to use spark sql? Anyone
 have some simple example on this? appreciate any help. thanks in advance.

 On Sat, Mar 28, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote:

 See
 https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html

 I haven't tried the SQL statements in above blog myself.

 Cheers

 On Sat, Mar 28, 2015 at 5:39 AM, Vincent He 
 vincent.he.andr...@gmail.com wrote:

 thanks for your information . I have read it, I can run sample with
 scala or python, but for spark-sql shell, I can not get an exmaple running
 successfully, can you give me an example I can run with ./bin/spark-sql
 without writing any code? thanks

 On Sat, Mar 28, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote:

 Please take a look at
 https://spark.apache.org/docs/latest/sql-programming-guide.html

 Cheers



  On Mar 28, 2015, at 5:08 AM, Vincent He 
 vincent.he.andr...@gmail.com wrote:
 
 
  I am learning spark sql and try spark-sql example,  I running
 following code, but I got exception ERROR CliDriver:
 org.apache.spark.sql.AnalysisException: cannot recognize input near
 'CREATE' 'TEMPORARY' 'TABLE' in ddl statement; line 1 pos 17, I have two
 questions,
  1. Do we have a list of the statement supported in spark-sql ?
  2. Does spark-sql shell support hiveql ? If yes, how to set?
 
  The example I tried:
  CREATE TEMPORARY TABLE jsonTable
  USING org.apache.spark.sql.json
  OPTIONS (
path examples/src/main/resources/people.json
  )
  SELECT * FROM jsonTable
  The exception I got,
   CREATE TEMPORARY TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path examples/src/main/resources/people.json
)
SELECT * FROM jsonTable
;
  15/03/28 17:38:34 INFO ParseDriver: Parsing command: CREATE
 TEMPORARY TABLE jsonTable
  USING org.apache.spark.sql.json
  OPTIONS (
path examples/src/main/resources/people.json
  )
  SELECT * FROM jsonTable
  NoViableAltException(241@[654:1: ddlStatement : (
 createDatabaseStatement | switchDatabaseStatement | dropDatabaseStatement 
 |
 createTableStatement | dropTableStatement | truncateTableStatement |
 alterStatement | descStatement | showStatement | metastoreCheck |
 createViewStatement | dropViewStatement | createFunctionStatement |
 createMacroStatement | createIndexStatement | dropIndexStatement |
 dropFunctionStatement | dropMacroStatement | analyzeStatement |
 lockStatement | unlockStatement | lockDatabase | unlockDatabase |
 createRoleStatement | dropRoleStatement | grantPrivileges |
 revokePrivileges | showGrants | showRoleGrants | showRolePrincipals |
 showRoles | grantRole | revokeRole | setRole | showCurrentRole );])
  at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
  at org.antlr.runtime.DFA.predict(DFA.java:144)
  at
 org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2090)
  at
 org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1398)
  at
 org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1036)
  at
 org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:199)
  at
 org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
  at
 org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227)
  at
 org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:241)
  at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
  at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
  at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
  at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
  at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
  at
 

Re: can't union two rdds

2015-03-31 Thread ankurjain.nitrr
Rdd union will result in  

  1 2 
  3 4 
  5 6 
  7 8 
  9 10 
11 12

What you are trying to do is join.
There must be a logic/key to perform join operation.

I think in your case you want the order (index) to be the joining key here.
RDD is a distributed data structure and is not apt for your case.

If that amount for data is less, you can use rdd.collect, just iterate on it
both the list and produce the desired result



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22323.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Actor not found

2015-03-31 Thread sparkdi
This is the whole output from the shell:

~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root
15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root
15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view pe  


rmissions: Set(root); users with modify permissions: Set(root)
15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server
15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:40 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:47797
15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class
server' on port 47797.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0
15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root
15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root
15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view pe  


rmissions: Set(root); users with modify permissions: Set(root)
15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started
15/03/30 19:00:42 INFO Remoting: Starting remoting
15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@vm:52574]
15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on
port 52574.
15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker
15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster
15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at
/tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl  


ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5
15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h   

   
ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea
15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server
15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:43 INFO AbstractConnector: Started
SocketConnector@0.0.0.0:48500
15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file
server' on port 48500.
15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator
15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
15/03/30 19:00:43 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on port
4040.
15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040
15/03/30 19:00:43 INFO Executor: Starting executor ID driver on host
localhost
15/03/30 19:00:43 INFO Executor: Using REPL class URI:
http://10.11.204.80:47797
15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@vm:5 
 
2574/user/HeartbeatReceiver
15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for:
ActorSelection[Anchor(akka://sparkDriver/deadLetters),  

 
Path(/)]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at

Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Sean Bigdatafun
(resending...)

I was thinking the same setup… But the more I think of this problem, and
the more interesting this could be.

If we allocate 50% total memory to Tachyon statically, then the Mesos
benefits of dynamically scheduling resources go away altogether.

Can Tachyon be resource managed by Mesos (dynamically)? Any thought or
comment?

Sean





 Hi Haoyuan,

 So on each mesos slave node I should allocate/section off some amount
 of memory for tachyon (let's say 50% of the total memory) and the rest
 for regular mesos tasks?

 This means, on each slave node I would have tachyon worker (+ hdfs
 configuration to talk to s3 or the hdfs datanode) and the mesos slave
 ?process. Is this correct?





-- 
--Sean


rdd.cache() not working ?

2015-03-31 Thread fightf...@163.com
Hi, all

Running the following code snippet through spark-shell, however cannot see any 
cached storage partitions in web ui.

Does this mean that cache now working ? Cause if we issue person.count again 
that we cannot say any time consuming

performance upgrading. Hope anyone can explain this for a little. 

Best,

Sun.

   case class Person(id: Int, col1: String)

   val person = 
sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p
 = Person(p(0).trim.toInt, p(1)))
   
   person.cache
   
   person.count



fightf...@163.com


How to setup a Spark Cluter?

2015-03-31 Thread bhushansc007
Hi All,

I am quite new to Spark. So please pardon me if it is a very basic question. 

I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master and
3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and
ZooKeeper services installed. 

Now, I want to install Spark on it. How do I do that? I searched a lot
online, but there is no clear step-by-step installation guide to do that.
All I find is the standalone setup guides. Can someone provide steps? What
needs to be copied to each machine? Where and what config changes should be
made on each machine?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
guoqiang ??s method works very well ??

it only takes 1TB disk now.

thank you very much!



 ?? 2015??3??314:47??GuoQiang Li wi...@qq.com ??
 
 You can try to enforce garbage collection:
 
 /** Run GC and make sure it actually has run */
 def runGC() {
   val weakRef = new WeakReference(new Object())
   val startTime = System.currentTimeMillis
   System.gc() // Make a best effort to run the garbage collection. It 
 *usually* runs GC.
   // Wait until a weak reference object has been GCed
   System.runFinalization()
   while (weakRef.get != null) {
 System.gc()
 System.runFinalization()
 Thread.sleep(200)
 if (System.currentTimeMillis - startTime  1) {
   throw new Exception(automatically cleanup error)
 }
   }
 }
 
 
 --  --
 ??: lisendonglisend...@163.com mailto:lisend...@163.com; 
 : 2015??3??31??(??) 3:47
 ??: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; 
 : Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; 
 useruser@spark.apache.org mailto:user@spark.apache.org; Sean 
 Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang 
 Liwi...@qq.com mailto:wi...@qq.com; 
 : Re: different result from implicit ALS with explicit ALS
 
 I have update my spark source code to 1.3.1.
 
 the checkpoint works well. 
 
 BUT the shuffle data still could not be delete automatically??the disk usage 
 is still 30TB??
 
 I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
 
 Do you know how to solve my problem?
 
 Sendong Li
 
 
 
 ?? 2015??3??3112:11??Xiangrui Meng men...@gmail.com 
 mailto:men...@gmail.com ??
 
 setCheckpointInterval was added in the current master and branch-1.3. Please 
 help check whether it works. It will be included in the 1.3.1 and 1.4.0 
 release. -Xiangrui
 
 On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 hi, xiangrui:
 I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
 the code is :
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
  
 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 PastedGraphic-2.tiff
 
 the checkpoint is very important in my situation, because my task will 
 produce 1TB shuffle data in each iteration, it the shuffle data is not 
 deleted in each iteration(using checkpoint()), the task will produce 30TB 
 data??
 
 
 So I change the ALS code, and re-compile by myself, but it seems the 
 checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
 only add two lines to the ALS.scala) :
 
 PastedGraphic-3.tiff
 
 
 
 and the driver??s log seems strange, why the log is printed together...
 PastedGraphic-1.tiff
 
 thank you very much!
 
 
 ?? 2015??2??2611:33??163 lisend...@163.com 
 mailto:lisend...@163.com ??
 
 Thank you very much for your opinion:)
 
 In our case, maybe it 's dangerous to treat un-observed item as negative 
 interaction(although we could give them small confidence, I think they are 
 still incredible...)
 
 I will do more experiments and give you feedback:)
 
 Thank you;)
 
 
 ?? 2015??2??2623:16??Sean Owen so...@cloudera.com 
 mailto:so...@cloudera.com ??
 
 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.
 
 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.
 
 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.
 
 On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com 
 mailto:lisend...@163.com wrote:
 oh my god, I think I understood...
 In my case, there are three kinds of user-item pairs:
 
 Display and click pair(positive pair)
 Display but no-click pair(negative pair)
 No-display pair(unobserved pair)
 
 Explicit ALS only consider the first and the second kinds
 But implicit ALS consider all the three kinds of pair(and consider the 
 third
 kind as the second pair, because their preference value are all zero and
 confidence are all 1)
 
 So the result are different. right?
 
 Could you please give me some advice, which ALS should I use?
 If I use the implicit ALS, how to distinguish the second and the third 
 kind
 of pair:)
 
 My opinion is in my case, I should use explicit ALS ...
 
 Thank you so much
 
 ?? 2015??2??2622:41??Xiangrui Meng m...@databricks.com 
 mailto:m...@databricks.com 

Re: refer to dictionary

2015-03-31 Thread Peng Xia
Hi Ted,

Thanks very much, yea, using broadcast is much faster.

Best,
Peng

On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu yuzhih...@gmail.com wrote:

 You can use broadcast variable.

 See also this thread:

 http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+



  On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote:
 
  Hi,
 
  I have a RDD (rdd1)where each line is split into an array [a, b,
 c], etc.
  And I also have a local dictionary p (dict1) stores key value pair
 {a:1, b: 2, c:3}
  I want to replace the keys in the rdd with the its corresponding value
 in the dict:
  rdd1.map(lambda line: [dict1[item] for item in line])
 
  But this task is not distributed, I believe the reason is the dict1 is a
 local instance.
  Can any one provide suggestions on this to parallelize this?
 
 
  Thanks,
  Best,
  Peng
 



Ambiguous references to a field set in a partitioned table AND the data

2015-03-31 Thread nfo
Hi,

I save Parquet files in a partitioned table, so in a path looking like
/path/to/table/myfield=a/ .
But I also kept the field myfield in the Parquet data. Thus. when I query
the field, I get this error:

df.select(myfield).show(10)
Exception in thread main org.apache.spark.sql.AnalysisException:
Ambiguous references to myfield  (myfield#2,List()),(myfield#47,List());

Looking at the code, I could not find a way to explicitly specify which
column I'd want. DataFrame#columns returns strings. Even by loading the data
with a schema (StructType), I'm not sure I can do it.

Should I have to make sure that my partition field does not exist in the
data before saving ? Or is there a way to declare what column in the schema
I want to query ?

Also, for the same reasons, if I try to persist() the data, I get this
error:

* Caused by: java.lang.ArrayIndexOutOfBoundsException: 3
at parquet.bytes.BytesUtils.bytesToInt(BytesUtils.java:227)
at
parquet.column.statistics.IntStatistics.setMinMaxFromBytes(IntStatistics.java:46)
at
parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
at
parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:558)
at
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:492)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ambiguous-references-to-a-field-set-in-a-partitioned-table-AND-the-data-tp22325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Udit Mehta
I have noticed a similar issue when using spark streaming. The spark
shuffle write size increases to a large size(in GB) and then the app
crashes saying:
java.io.FileNotFoundException:
/data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
(No such file or directory)

I dont understand why the shuffle size increases to such a large value for
long running jobs.

Thanks,
Udiy

On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Saisai. I will try your solution, but still i don't understand why
 filesystem should be used where there is a plenty of memory available!



 On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Shuffle write will finally spill the data into file system as a bunch of
 files. If you want to avoid disk write, you can mount a ramdisk and
 configure spark.local.dir to this ram disk. So shuffle output will write
 to memory based FS, and will not introduce disk IO.

 Thanks
 Jerry

 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:

 Hi,

 I was looking at SparkUI, Executors, and I noticed that I have 597 MB
 for  Shuffle while I am using cached temp-table and the Spark had 2 GB
 free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!

 Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be
 done in memory?

 best,

 /Shahab






Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Sean Owen
Yep, it's not serializable:
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html

You can't return this from a distributed operation since that would
mean it has to travel over the network and you haven't supplied any
way to convert the thing into bytes.

On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com wrote:
 When I am trying to get the result from Hbase and running mapToPair function
 of RRD its giving the error
 java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

 Here is the code

 // private static JavaPairRDDInteger, Result
 getCompanyDataRDD(JavaSparkContext sc) throws IOException {
 // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
 TableInputFormat.class, ImmutableBytesWritable.class,
 //Result.class).mapToPair(new
 PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() {
 //
 // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable,
 Result t) throws Exception {
 // System.out.println(In getCompanyDataRDD+t._2);
 //
 // String cknid = Bytes.toString(t._1.get());
 // System.out.println(processing cknids is:+cknid);
 // Integer cknidInt = Integer.parseInt(cknid);
 // Tuple2Integer, Result returnTuple = new Tuple2Integer,
 Result(cknidInt, t._2);
 // return returnTuple;
 // }
 // });
 // }

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcasting a parquet file using spark and python

2015-03-31 Thread Michael Armbrust
In Spark 1.3 I would expect this to happen automatically when the parquet
table is small ( 10mb, configurable with
spark.sql.autoBroadcastJoinThreshold).
If you are running 1.3 and not seeing this, can you show the code you are
using to create the table?

On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 jitesh...@gmail.com wrote:

 How can we implement a BroadcastHashJoin for spark with python?

 My SparkSQL inner joins are taking a lot of time since it is performing
 ShuffledHashJoin.

 Tables on which join is performed are stored as parquet files.

 Please help.

 Thanks and regards,
 Jitesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to
expose it via SparkSQL. I am using spark 1.2.1, latest supported by
elasticsearch-hadoop, and org.elasticsearch % elasticsearch-hadoop %
2.1.0.BUILD-SNAPSHOT of elasticsearch-hadoop. I’m
encountering an issue when I attempt to query the following json after
creating a temporary table from it. The json looks like this:

PUT /_template/device
{
  template: dev*,
  settings: {
number_of_shards: 1
  },
  mappings: {
metric: {
  _timestamp : {
enabled : true,
stored : true,
path : timestamp,
format : -MM-dd'T'HH:mm:ssZZ
  },
  properties: {
pathId: {
  type: string
},
pathElements: {
  properties: {
node: {
  type: string
},
value: {
  type: string
}
  }
},
name: {
  type: string
},
value: {
  type: double
},
timestamp: {
  type: date,
  store: true
}
  }
}
  }
}

Querying all columns work fine except for the pathElements which is a json
array. If this is added to the select it fails with
ajava.util.NoSuchElementException:
key not found: node.

*Details*.

The program is pretty basic, looks like this:

/**
 * A simple sample to read and write to ES using elasticsearch-hadoop.
 */

package com.opsdatastore.elasticsearch.spark

import java.io.File


// Scala imports
import scala.collection.JavaConversions._
// Spark imports
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{SchemaRDD,SQLContext}

// ES imports
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

// OpsDataStore
import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch}

object ElasticSearchReadWrite {

  /**
   * Spark specific configuration
   */
  def sparkInit(): SparkContext = {
val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
conf.set(es.nodes, ElasticSearch.Nodes)
conf.set(es.port, ElasticSearch.HttpPort.toString())
conf.set(es.index.auto.create, true);
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer);
conf.set(spark.executor.memory,1g)
conf.set(spark.kryoserializer.buffer.mb,256)

val sparkContext = new SparkContext(conf)
sparkContext.addJar(Spark.JarPath + jar))
sparkContext
  }


  def main(args: Array[String]) {

val sc = sparkInit

val sqlContext = new SQLContext(sc)
import sqlContext._

val start = System.currentTimeMillis()

// specific query, just read all for now
sc.esRDD(s${ElasticSearch.Index}/${ElasticSearch.Type}, ?q=*:*)

/*
 * Read from ES and provide some insight with Spark  SparkSQL
 */
val esData = sc.esRDD(device/metric)

esData.collect.foreach(println(_))

val end = System.currentTimeMillis()
println(sTotal time: ${end-start} ms)

println(Create Metric Temporary Table for querying)
val schemaRDD = sqlContext.sql(
  CREATE TEMPORARY TABLE metric  +
  USING org.elasticsearch.spark.sql  +
  OPTIONS (resource 'device/metric')   )

System.out.println()
System.out.println(#  Scheam Definition   #)
System.out.println()
schemaRDD.printSchema()

System.out.println()
System.out.println(#  Data from SparkSQL  #)
System.out.println()

sqlContext.sql(SELECT path, pathElements, `timestamp`, name,
value FROM metric).collect.foreach(println(_))
  }
}

So this works fine:

sc.esRDD(**device/metric)
esData.collect.foreach(println(_))

And results in this:

15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at
ElasticSearchReadWrite.scala:67, took 4.948556 s
(AUxxDrs4cgadF5SlaMg0,Map(pathElements - Buffer(Map(node - State,
value - PA), Map(node - City, value - Pittsburgh), Map(node -
Street, value - 12345 Westbrook Drive), Map(node - level, value -
main), Map(node - device, value - thermostat)), value -
29.590943279257175, name - Current Temperature, timestamp -
2015-03-27T14:53:46+, path - /PA/Pittsburgh/12345 Westbrook
Drive/main/theromostat-1))

Yet this fails:

sqlContext.sql(SELECT path, pathElements, `timestamp`, name, value
FROM metric).collect.foreach(println(_))

With this exception:

Create Metric Temporary Table for
querying#  Scheam
Definition   #
root
#  Data from SparkSQL
#15/03/31 14:37:49
INFO BlockManager: Removing broadcast 015/03/31 14:37:49 INFO
BlockManager: 

java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Jeetendra Gangele
When I am trying to get the result from Hbase and running mapToPair
function of RRD its giving the error
java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

Here is the code

// private static JavaPairRDDInteger, Result
getCompanyDataRDD(JavaSparkContext sc) throws IOException {
// return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
TableInputFormat.class, ImmutableBytesWritable.class,
//Result.class).mapToPair(new
PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() {
//
// public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable,
Result t) throws Exception {
// System.out.println(In getCompanyDataRDD+t._2);
//
// String cknid = Bytes.toString(t._1.get());
// System.out.println(processing cknids is:+cknid);
// Integer cknidInt = Integer.parseInt(cknid);
// Tuple2Integer, Result returnTuple = new Tuple2Integer,
Result(cknidInt, t._2);
// return returnTuple;
// }
// });
// }


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Anny Chen
Hi Akhil,

I tried editing the /etc/hosts on the master and on the workers, and seems
it is not working for me.

I tried adding hostname internal-ip and it didn't work. I then tried
adding internal-ip hostname and it didn't work either. I guess I should
also edit the spark-env.sh file?

Thanks!
Anny

On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can add an internal ip to public hostname mapping in your /etc/hosts
 file, if your forwarding is proper then it wouldn't be a problem there
 after.



 Thanks
 Best Regards

 On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote:

 Hi,

 For security reasons, we added a server between my aws Spark Cluster and
 local, so I couldn't connect to the cluster directly. To see the SparkUI
 and
 its related work's  stdout and stderr, I used dynamic forwarding and
 configured the SOCKS proxy. Now I could see the SparkUI using the
 internal
 ec2 ip, however when I click on the application UI (4040) or the worker's
 UI
 (8081), it still automatically uses the public DNS instead of internal ec2
 ip, which the browser now couldn't show.

 Is there a way that I could configure this? I saw that one could configure
 the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
 help. Does anyone experience the same issue?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to setup a Spark Cluter?

2015-03-31 Thread Akhil Das
Its pretty simple, pick one machine as master (say machine A), and lets
call the workers are B,C, and D

*Login to A:*

- Enable passwd less authentication (ssh-keygen)
   - Add A's ~/.ssh/id_rsa.pub to B,C,D's ~/.ssh/authorized_keys file

- Download spark binary (that supports your hadoop version) from
https://spark.apache.org/downloads.html (eg: wget
http://d3kbcqa49mib13.cloudfront.net/spark-1.3.0-bin-hadoop2.4.tgz)
- Extract it (tar xf spark*tgz)
- cd spark-1.3.0-bin-hadoop2.4;cp conf/spark-env.sh.template
conf/spark-env.sh
- vi conf/spark-env.sh : Now configure SPARK_MASTER_IP, SPARK_WORKER_CORES,
SPARK_WORKER_MEMORY as the resources you have.
- vi conf/slaves : Add B,C,D hostnames/ipaddress line by line

- cd ../;
- rsync -za spark-1.3.0-bin-hadoop2.4 B:
- rsync -za spark-1.3.0-bin-hadoop2.4 C:
- rsync -za spark-1.3.0-bin-hadoop2.4 D:
- cd spark-1.3.0-bin-hadoop2.4;sbin/start-all.sh

Now your cluster is up and running, just be careful with your firewall
entries. If you open up all ports then anyone can take over your cluster.
:) Read more : https://www.sigmoid.com/securing-apache-spark-cluster/





Thanks
Best Regards

On Tue, Mar 31, 2015 at 10:26 PM, bhushansc007 bhushansc...@gmail.com
wrote:

 Hi All,

 I am quite new to Spark. So please pardon me if it is a very basic
 question.

 I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master
 and
 3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and
 ZooKeeper services installed.

 Now, I want to install Spark on it. How do I do that? I searched a lot
 online, but there is no clear step-by-step installation guide to do that.
 All I find is the standalone setup guides. Can someone provide steps? What
 needs to be copied to each machine? Where and what config changes should be
 made on each machine?

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Petar Zecevic


Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh?


On 31.3.2015. 19:19, Anny Chen wrote:

Hi Akhil,

I tried editing the /etc/hosts on the master and on the workers, and 
seems it is not working for me.


I tried adding hostname internal-ip and it didn't work. I then 
tried adding internal-ip hostname and it didn't work either. I 
guess I should also edit the spark-env.sh file?


Thanks!
Anny

On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das 
ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote:


You can add an internal ip to public hostname mapping in your
/etc/hosts file, if your forwarding is proper then it wouldn't be
a problem there after.



Thanks
Best Regards

On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com
mailto:anny9...@gmail.com wrote:

Hi,

For security reasons, we added a server between my aws Spark
Cluster and
local, so I couldn't connect to the cluster directly. To see
the SparkUI and
its related work's  stdout and stderr, I used dynamic
forwarding and
configured the SOCKS proxy. Now I could see the SparkUI using
the  internal
ec2 ip, however when I click on the application UI (4040) or
the worker's UI
(8081), it still automatically uses the public DNS instead of
internal ec2
ip, which the browser now couldn't show.

Is there a way that I could configure this? I saw that one
could configure
the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether
this could
help. Does anyone experience the same issue?

Thanks a lot!
Anny




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org







Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Akhil Das
When you say you added internal-ip hostname, where you able to ping any
of these from the machine?

You could try setting SPARK_LOCAL_IP on all machines. But make sure you
will be able to bind to that host/ip specified there.


Thanks
Best Regards

On Tue, Mar 31, 2015 at 10:49 PM, Anny Chen anny9...@gmail.com wrote:

 Hi Akhil,

 I tried editing the /etc/hosts on the master and on the workers, and seems
 it is not working for me.

 I tried adding hostname internal-ip and it didn't work. I then tried
 adding internal-ip hostname and it didn't work either. I guess I should
 also edit the spark-env.sh file?

 Thanks!
 Anny

 On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can add an internal ip to public hostname mapping in your /etc/hosts
 file, if your forwarding is proper then it wouldn't be a problem there
 after.



 Thanks
 Best Regards

 On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote:

 Hi,

 For security reasons, we added a server between my aws Spark Cluster and
 local, so I couldn't connect to the cluster directly. To see the SparkUI
 and
 its related work's  stdout and stderr, I used dynamic forwarding and
 configured the SOCKS proxy. Now I could see the SparkUI using the
 internal
 ec2 ip, however when I click on the application UI (4040) or the
 worker's UI
 (8081), it still automatically uses the public DNS instead of internal
 ec2
 ip, which the browser now couldn't show.

 Is there a way that I could configure this? I saw that one could
 configure
 the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
 help. Does anyone experience the same issue?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Nan Zhu
The example in 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
 might help

Best, 

-- 
Nan Zhu
http://codingcat.me


On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote:

 Yep, it's not serializable:
 https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html
 
 You can't return this from a distributed operation since that would
 mean it has to travel over the network and you haven't supplied any
 way to convert the thing into bytes.
 
 On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com 
 (mailto:gangele...@gmail.com) wrote:
  When I am trying to get the result from Hbase and running mapToPair function
  of RRD its giving the error
  java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
  
  Here is the code
  
  // private static JavaPairRDDInteger, Result
  getCompanyDataRDD(JavaSparkContext sc) throws IOException {
  // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
  TableInputFormat.class, ImmutableBytesWritable.class,
  // Result.class).mapToPair(new
  PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() {
  //
  // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable,
  Result t) throws Exception {
  // System.out.println(In getCompanyDataRDD+t._2);
  //
  // String cknid = Bytes.toString(t._1.get());
  // System.out.println(processing cknids is:+cknid);
  // Integer cknidInt = Integer.parseInt(cknid);
  // Tuple2Integer, Result returnTuple = new Tuple2Integer,
  Result(cknidInt, t._2);
  // return returnTuple;
  // }
  // });
  // }
  
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 (mailto:user-unsubscr...@spark.apache.org)
 For additional commands, e-mail: user-h...@spark.apache.org 
 (mailto:user-h...@spark.apache.org)
 
 




Re: Spark-events does not exist error, while it does with all the req. rights

2015-03-31 Thread Marcelo Vanzin
Hmmm... could you try to set the log dir to
file:/home/hduser/spark/spark-events?

I checked the code and it might be the case that the behaviour changed
between 1.2 and 1.3...

On Mon, Mar 30, 2015 at 6:44 PM, Tom Hubregtsen thubregt...@gmail.com wrote:
 The stack trace for the first scenario and your suggested improvement is
 similar, with as only difference the first line (Sorry for not including
 this):
 Log directory /home/hduser/spark/spark-events does not exist.

 To verify your premises, I cd'ed into the directory by copy pasting the path
 listed in the error message (i, ii), created a text file, closed it an
 viewed it, and deleted it (iii). My findings were reconfirmed by my
 colleague. Any other ideas?

 Thanks,

 Tom


 On 30 March 2015 at 19:19, Marcelo Vanzin van...@cloudera.com wrote:

 So, the error below is still showing the invalid configuration.

 You mentioned in the other e-mails that you also changed the
 configuration, and that the directory really, really exists. Given the
 exception below, the only ways you'd get the error with a valid
 configuration would be if (i) the directory didn't exist, (ii) it
 existed but the user could not navigate to it or (iii) it existed but
 was not actually a directory.

 So please double-check all that.

 On Mon, Mar 30, 2015 at 5:11 PM, Tom Hubregtsen thubregt...@gmail.com
 wrote:
  Stack trace:
  15/03/30 17:37:30 INFO storage.BlockManagerMaster: Registered
  BlockManager
  Exception in thread main java.lang.IllegalArgumentException: Log
  directory
  ~/spark/spark-events does not exist.


 --
 Marcelo





-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Udit Mehta
Thanks for the reply.
This will reduce the shuffle write to disk to an extent but for a long
running job(multiple days), the shuffle write would still occupy a lot of
space on disk. Why do we need to store the data from older map tasks to
memory?

On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak bijay.pat...@cloudwick.com
wrote:

 The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
 each Map tasks to memory until they they can't fit after which they
 are sorted and spilled to disk. You can reduce the Shuffle write to
 disk by increasing spark.shuffle.memoryFraction(default 0.2).

 By writing the shuffle output to disk the Spark lineage can be
 truncated when the RDDs are already materialized as the side-effects
 of earlier shuffle.This is the under the hood optimization in Spark
 which is only possible because of shuffle output output being written
 to disk.

 You can set spark.shuffle.spill to false if you don't want to spill to
 the disk and assuming you have enough heap memory.

 On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote:
  I have noticed a similar issue when using spark streaming. The spark
 shuffle
  write size increases to a large size(in GB) and then the app crashes
 saying:
  java.io.FileNotFoundException:
 
 /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
  (No such file or directory)
 
  I dont understand why the shuffle size increases to such a large value
 for
  long running jobs.
 
  Thanks,
  Udiy
 
  On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote:
 
  Thanks Saisai. I will try your solution, but still i don't understand
 why
  filesystem should be used where there is a plenty of memory available!
 
 
 
  On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
  wrote:
 
  Shuffle write will finally spill the data into file system as a bunch
 of
  files. If you want to avoid disk write, you can mount a ramdisk and
  configure spark.local.dir to this ram disk. So shuffle output will
 write
  to memory based FS, and will not introduce disk IO.
 
  Thanks
  Jerry
 
  2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:
 
  Hi,
 
  I was looking at SparkUI, Executors, and I noticed that I have 597 MB
  for  Shuffle while I am using cached temp-table and the Spark had 2
 GB free
  memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
 
  Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks
 be
  done in memory?
 
  best,
 
  /Shahab
 
 
 
 



Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
each Map tasks to memory until they they can't fit after which they
are sorted and spilled to disk. You can reduce the Shuffle write to
disk by increasing spark.shuffle.memoryFraction(default 0.2).

By writing the shuffle output to disk the Spark lineage can be
truncated when the RDDs are already materialized as the side-effects
of earlier shuffle.This is the under the hood optimization in Spark
which is only possible because of shuffle output output being written
to disk.

You can set spark.shuffle.spill to false if you don't want to spill to
the disk and assuming you have enough heap memory.

On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote:
 I have noticed a similar issue when using spark streaming. The spark shuffle
 write size increases to a large size(in GB) and then the app crashes saying:
 java.io.FileNotFoundException:
 /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
 (No such file or directory)

 I dont understand why the shuffle size increases to such a large value for
 long running jobs.

 Thanks,
 Udiy

 On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Saisai. I will try your solution, but still i don't understand why
 filesystem should be used where there is a plenty of memory available!



 On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Shuffle write will finally spill the data into file system as a bunch of
 files. If you want to avoid disk write, you can mount a ramdisk and
 configure spark.local.dir to this ram disk. So shuffle output will write
 to memory based FS, and will not introduce disk IO.

 Thanks
 Jerry

 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:

 Hi,

 I was looking at SparkUI, Executors, and I noticed that I have 597 MB
 for  Shuffle while I am using cached temp-table and the Spark had 2 GB 
 free
 memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!

 Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be
 done in memory?

 best,

 /Shahab





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using 'fair' scheduler mode

2015-03-31 Thread asadrao
Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the
first query is a very expensive query (ex: ‘select *’ on a really big data
set) than any subsequent query seem to get blocked. I would have expected
the second query to run in parallel since I am using the ‘fair’ scheduler
mode not the ‘fifo’. I am submitting the query through thrift server.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Did anybody run Spark-perf on powerpc?

2015-03-31 Thread Tom
We verified it runs on x86, and are now trying to run it on powerPC. We
currently run into dependency trouble with sbt. I tried installing sbt by
hand and resolving all dependencies by hand, but must have made an error, as
I still get errors.

Original error:
Getting org.scala-sbt sbt 0.13.6 ...

:: problems summary ::
 WARNINGS
module not found: org.scala-sbt#sbt;0.13.6

 local: tried

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/jars/sbt.jar

 typesafe-ivy-releases: tried

 
https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

 Maven Central: tried

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar

::

::  UNRESOLVED DEPENDENCIES ::

::

:: org.scala-sbt#sbt;0.13.6: not found

::


 ERRORS
Server access Error: Received fatal alert: decrypt_error
url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query REST web service with Spark?

2015-03-31 Thread Burak Yavuz
Hi,

If I recall correctly, I've read people integrating REST calls to Spark
Streaming jobs in the user list. I don't imagine any cases for why it
shouldn't be possible.

Best,
Burak

On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir minnown...@gmail.com wrote:

 We have have some data on Hadoop that needs augmented with data only
 available to us via a REST service.  We're using Spark to search for, and
 correct, missing data. Even though there are a lot of records to scour for
 missing data, the total number of calls to the service is expected to be
 low, so it would be ideal to do the whole job in Spark as we scour the data.

 I don't see anything obvious in the API or on Google relating to making
 REST calls from a Spark job.  Is it possible?

 Thanks,

 Alec



Ambiguous references to a field set in a partitioned table AND the data

2015-03-31 Thread Nicolas Fouché
  Hi,


I save Parquet files in a partitioned table, so in /path/to/table/myfield=a/ .
But I also kept the field myfield in the Parquet data. Thus. when I query the 
field, I get this error:


df.select(myfield).show(10)
Exception in thread main org.apache.spark.sql.AnalysisException: Ambiguous 
references to myfield  (myfield#2,List()),(myfield#47,List());


Looking at the code, I could not find a way to explicitly specify which column 
I'd want. DataFrame#columns returns strings. Even by loading the data with a 
schema (StructType), I'm not sure I can do it.


Should I have to make sure that my partition field does not exist in the data 
before saving ? Or is there a way to declare what column in the schema I want 
to query ?


Thanks.






pyspark error with zip

2015-03-31 Thread Charles Hayden
?

The following program fails in the zip step.

x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
z = x.distinct()
print x.zip(y).collect()


The error that is produced depends on whether multiple partitions have been 
specified or not.

I understand that

the two RDDs [must] have the same number of partitions and the same number of 
elements in each partition.

What is the best way to work around this restriction?

I have been performing the operation with the following code, but I am hoping 
to find something more efficient.

def safe_zip(left, right):
ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0]))
ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0]))
return ix_left.join(ix_right).sortByKey().values()




Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread Xiangrui Meng
Hey Guoqiang and Sendong,

Could you comment on the overhead of calling gc() explicitly? The shuffle
files should get cleaned in a few seconds after checkpointing, but it is
certainly possible to accumulates TBs of files in a few seconds. In this
case, calling gc() may work the same as waiting for a few seconds after
each checkpoint. Is it correct?

Best,
Xiangrui

On Tue, Mar 31, 2015 at 8:58 AM, lisendong lisend...@163.com wrote:

 guoqiang ’s method works very well …

 it only takes 1TB disk now.

 thank you very much!



 在 2015年3月31日,下午4:47,GuoQiang Li wi...@qq.com 写道:

 You can try to enforce garbage collection:

 /** Run GC and make sure it actually has run */
 def runGC() {
   val weakRef = new WeakReference(new Object())
   val startTime = System.currentTimeMillis
   System.gc() // Make a best effort to run the garbage collection. It
 *usually* runs GC.
   // Wait until a weak reference object has been GCed
   System.runFinalization()
   while (weakRef.get != null) {
 System.gc()
 System.runFinalization()
 Thread.sleep(200)
 if (System.currentTimeMillis - startTime  1) {
   throw new Exception(automatically cleanup error)
 }
   }
 }


 -- 原始邮件 --
 *发件人:* lisendonglisend...@163.com;
 *发送时间:* 2015年3月31日(星期二) 下午3:47
 *收件人:* Xiangrui Mengmen...@gmail.com;
 *抄送:* Xiangrui Mengm...@databricks.com; useruser@spark.apache.org;
 Sean Owenso...@cloudera.com; GuoQiang Liwi...@qq.com;
 *主题:* Re: different result from implicit ALS with explicit ALS

 I have update my spark source code to 1.3.1.

 the checkpoint works well.

 BUT the shuffle data still could not be delete automatically…the disk
 usage is still 30TB…

 I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.

 Do you know how to solve my problem?

 Sendong Li



 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com 写道:

 setCheckpointInterval was added in the current master and branch-1.3.
 Please help check whether it works. It will be included in the 1.3.1 and
 1.4.0 release. -Xiangrui

 On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com wrote:

 hi, xiangrui:
 I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
 the code is :

 https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 PastedGraphic-2.tiff

 the checkpoint is very important in my situation, because my task will
 produce 1TB shuffle data in each iteration, it the shuffle data is not
 deleted in each iteration(using checkpoint()), the task will produce 30TB
 data…


 So I change the ALS code, and re-compile by myself, but it seems the
 checkpoint does not take effects, and the task still occupy 30TB disk… ( I
 only add two lines to the ALS.scala) :

 PastedGraphic-3.tiff



 and the driver’s log seems strange, why the log is printed together...
 PastedGraphic-1.tiff

 thank you very much!


 在 2015年2月26日,下午11:33,163 lisend...@163.com 写道:

 Thank you very much for your opinion:)

 In our case, maybe it 's dangerous to treat un-observed item as negative
 interaction(although we could give them small confidence, I think they are
 still incredible...)

 I will do more experiments and give you feedback:)

 Thank you;)


 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com 写道:

 I believe that's right, and is what I was getting at. yes the implicit
 formulation ends up implicitly including every possible interaction in
 its loss function, even unobserved ones. That could be the difference.

 This is mostly an academic question though. In practice, you have
 click-like data and should be using the implicit version for sure.

 However you can give negative implicit feedback to the model. You
 could consider no-click as a mild, observed, negative interaction.
 That is: supply a small negative value for these cases. Unobserved
 pairs are not part of the data set. I'd be careful about assuming the
 lack of an action carries signal.

 On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com wrote:
 oh my god, I think I understood...
 In my case, there are three kinds of user-item pairs:

 Display and click pair(positive pair)
 Display but no-click pair(negative pair)
 No-display pair(unobserved pair)

 Explicit ALS only consider the first and the second kinds
 But implicit ALS consider all the three kinds of pair(and consider the
 third
 kind as the second pair, because their preference value are all zero and
 confidence are all 1)

 So the result are different. right?

 Could you please give me some advice, which ALS should I use?
 If I use the implicit ALS, how to distinguish the second and the third
 kind
 of pair:)

 My opinion is in my case, I should use explicit ALS ...

 Thank you so much

 在 2015年2月26日,22:41,Xiangrui Meng m...@databricks.com 写道:

 Lisen, did you use all m-by-n pairs during training? Implicit model
 penalizes unobserved ratings, while explicit model doesn't. -Xiangrui

 On Feb 26, 

joining multiple parquet files

2015-03-31 Thread roni
Hi ,
 I have 4 parquet files and I want to find data which is common in all of
them
e.g

SELECT TableA.*, TableB.*, TableC.*, TableD.* FROM (TableB INNER JOIN TableA
ON TableB.aID= TableA.aID)
INNER JOIN TableC ON(TableB.cID= Tablec.cID)
INNER JOIN TableA ta ON(ta.dID= TableD.dID)
WHERE (DATE(TableC.date)=date(now()))


I can do a 2 files join like -  val joinedVal =
g1.join(g2,g1.col(kmer) === g2.col(kmer))

But I am trying to find common kmer strings  from 4 files.

Thanks

Roni


Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame

2015-03-31 Thread Xiangrui Meng
I cannot reproduce this error on master, but I'm not aware of any
recent bug fixes that are related. Could you build and try the current
master? -Xiangrui

On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Hi all,

 DataFrame with an user defined type (here mllib.Vector) created with
 sqlContex.createDataFrame can't be saved to parquet file and raise
 ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast
 to org.apache.spark.sql.Row error.

 Here is an example of code to reproduce this error :

 object TestDataFrame {

   def main(args: Array[String]): Unit = {
 //System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
 val conf = new
 SparkConf().setAppName(RankingEval).setMaster(local[8])
   .set(spark.executor.memory, 6g)

 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

 import sqlContext.implicits._

 val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10
 val dataDF = data.toDF

 dataDF.save(test1.parquet)

 val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema)

 dataDF2.save(test2.parquet)
   }
 }


 Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how
 can it be solved ?


 Cheers,


 Jao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can't union two rdds

2015-03-31 Thread roy
use zip



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22321.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark sql query fails with executor lost/ out of memory expection while caching a table

2015-03-31 Thread ankurjain.nitrr
Hi,

I am using spark 1.2.1

I am using thrift server to query my data.


while executing query CACHE TABLE tablename

Fails with exception

Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 10.0 (TID 41, bbr-dev178): Execu
torLostFailure (executor 12 lost)

and sometimes

Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 8.0 (TID 33, bbr-dev178): java.la
ng.OutOfMemoryError: Java heap space


I understand that my executors are going out of memory during the caching
and therefore getting killed.

My question is.. 

Is there a way to make the thirft server spill the data to disk if it is not
able keep the entire dataset in memory?
Can i change the Storage Level for spark sql thrift server for caching?

I don't want my executors to get lost and cache queries to get failed.








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-query-fails-with-executor-lost-out-of-memory-expection-while-caching-a-table-tp22322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi,

I am fairly new to the spark ecosystem and I have been trying to setup
a spark on mesos deployment. I can't seem to figure out the best
practices around HDFS and Tachyon. The documentation about Spark's
data-locality section seems to point that each of my mesos slave nodes
should also run a hdfs datanode. This seems fine but I can't seem to
figure out how I would go about pushing tachyon into the mix.

How should i organize my cluster?
Should tachyon be colocated on my mesos worker nodes? or should all
the spark jobs reach out to a separate hdfs/tachyon cluster.

- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp
Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN
NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb
/wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL
jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6
iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4=
=8ijP
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Haoyuan Li
Tachyon should be co-located with Spark in this case.

Best,

Haoyuan

On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan achau...@brightcove.com
wrote:

 -BEGIN PGP SIGNED MESSAGE-
 Hash: SHA1

 Hi,

 I am fairly new to the spark ecosystem and I have been trying to setup
 a spark on mesos deployment. I can't seem to figure out the best
 practices around HDFS and Tachyon. The documentation about Spark's
 data-locality section seems to point that each of my mesos slave nodes
 should also run a hdfs datanode. This seems fine but I can't seem to
 figure out how I would go about pushing tachyon into the mix.

 How should i organize my cluster?
 Should tachyon be colocated on my mesos worker nodes? or should all
 the spark jobs reach out to a separate hdfs/tachyon cluster.

 - -- Ankur Chauhan
 -BEGIN PGP SIGNATURE-

 iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp
 Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN
 NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb
 /wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL
 jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6
 iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4=
 =8ijP
 -END PGP SIGNATURE-

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Haoyuan Li
AMPLab, EECS, UC Berkeley
http://www.cs.berkeley.edu/~haoyuan/


Re: deployment of spark on mesos and data locality in tachyon/hdfs

2015-03-31 Thread Ankur Chauhan
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Haoyuan,

So on each mesos slave node I should allocate/section off some amount
of memory for tachyon (let's say 50% of the total memory) and the rest
for regular mesos tasks?

This means, on each slave node I would have tachyon worker (+ hdfs
configuration to talk to s3 or the hdfs datanode) and the mesos slave
process. Is this correct?

On 31/03/2015 16:43, Haoyuan Li wrote:
 Tachyon should be co-located with Spark in this case.
 
 Best,
 
 Haoyuan
 
 On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan
 achau...@brightcove.com mailto:achau...@brightcove.com wrote:
 
 Hi,
 
 I am fairly new to the spark ecosystem and I have been trying to
 setup a spark on mesos deployment. I can't seem to figure out the
 best practices around HDFS and Tachyon. The documentation about
 Spark's data-locality section seems to point that each of my mesos
 slave nodes should also run a hdfs datanode. This seems fine but I
 can't seem to figure out how I would go about pushing tachyon into
 the mix.
 
 How should i organize my cluster? Should tachyon be colocated on my
 mesos worker nodes? or should all the spark jobs reach out to a
 separate hdfs/tachyon cluster.
 
 -- Ankur Chauhan
 
 -

 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 mailto:user-unsubscr...@spark.apache.org For additional commands,
 e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 
 
 
 -- Haoyuan Li AMPLab, EECS, UC Berkeley 
 http://www.cs.berkeley.edu/~haoyuan/

- -- 
- -- Ankur Chauhan
-BEGIN PGP SIGNATURE-

iQEcBAEBAgAGBQJVGzKUAAoJEOSJAMhvLp3L3W4IAIVYiEKIZbC1a36/KWo94xYB
dvE4VXxF7z5FWmpuaHBEa+U1XWrR4cLVsQhocusOFn+oC7bstdltt3cGNAuwFSv6
Oogs4Sl1J4YZm8omKVdCkwD6Hv71HSntM8llz3qTW+Ljk2aKhfvNtp5nioQAm3e+
bs4ZKlCBij/xV3LbYYIePSS3lL0d9m1qEDJvi6jFcfm3gnBYeNeL9x92B5ylyth0
BGHnPN4sV/yopgrqOimLb12gSexHGNP1y6JBYy8NrHRY8SxkZ4sWKuyDnGDCOPOc
HC14Parf5Ly5FEz5g5WjF6HrXRdPlgr2ABxSLWOAB/siXsX9o/4yCy7NtDNcL6Y=
=f2xI
-END PGP SIGNATURE-

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query REST web service with Spark?

2015-03-31 Thread Todd Nist
Here are a few ways to achieve what your loolking to do:

https://github.com/cjnolet/spark-jetty-server

Spark Job Server - https://github.com/spark-jobserver/spark-jobserver -

defines a REST API for Spark

Hue -

http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/

Spark Kernel project: https://github.com/ibm-et/spark-kernel

 The Spark Kernel's goal is to serve as the foundation for interactive
 applications. The project provides a client library in Scala that abstracts
 connecting to the kernel (containing a Spark Context), which can be
 embedded into a web application. We demonstrated this at StataConf when we
 embedded the Spark Kernel client into a Play application to provide an
 interactive web application that communicates to Spark via the Spark Kernel
 (hosting a SparkContext).


Hopefully one of those will give you what your looking for.

-Todd

On Tue, Mar 31, 2015 at 5:06 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 If I recall correctly, I've read people integrating REST calls to Spark
 Streaming jobs in the user list. I don't imagine any cases for why it
 shouldn't be possible.

 Best,
 Burak

 On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir minnown...@gmail.com wrote:

 We have have some data on Hadoop that needs augmented with data only
 available to us via a REST service.  We're using Spark to search for, and
 correct, missing data. Even though there are a lot of records to scour for
 missing data, the total number of calls to the service is expected to be
 low, so it would be ideal to do the whole job in Spark as we scour the data.

 I don't see anything obvious in the API or on Google relating to making
 REST calls from a Spark job.  Is it possible?

 Thanks,

 Alec





Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-03-31 Thread Xiangrui Meng
Hey Sean,

That is true for explicit model, but not for implicit. The ALS-WR
paper doesn't cover the implicit model. In implicit formulation, a
sub-problem (for v_j) is:

min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2

This is a sum for all i but not just the users who rate item j. In
this case, if we set X=m_j, the number of observed ratings for item j,
it is not really scale invariant. We have #users user vectors in the
least squares problem but only penalize lambda * #ratings. I was
suggesting using lambda * m directly for implicit model to match the
number of vectors in the least squares problem. Well, this is my
theory. I don't find any public work about it.

Best,
Xiangrui

On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com wrote:
 I had always understood the formulation to be the first option you
 describe. Lambda is scaled by the number of items the user has rated /
 interacted with. I think the goal is to avoid fitting the tastes of
 prolific users disproportionately just because they have many ratings
 to fit. This is what's described in the ALS-WR paper we link to on the
 Spark web site, in equation 5
 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)

 I think this also gets you the scale-invariance? For every additional
 rating from user i to product j, you add one new term to the
 squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
 regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at least
 both increasing about linearly as ratings increase. If the
 regularization term is multiplied by the total number of users and
 products in the model, then it's fixed.

 I might misunderstand you and/or be speaking about something slightly
 different when it comes to invariance. But FWIW I had always
 understood the regularization to be multiplied by the number of
 explicit ratings.

 On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote:
 Okay, I didn't realize that I changed the behavior of lambda in 1.3.
 to make it scale-invariant, but it is worth discussing whether this
 is a good change. In 1.2, we multiply lambda by the number ratings in
 each sub-problem. This makes it scale-invariant for explicit
 feedback. However, in implicit feedback model, a user's sub-problem
 contains all item factors. Then the question is whether we should
 multiply lambda by the number of explicit ratings from this user or by
 the total number of items. We used the former in 1.2 but changed to
 the latter in 1.3. So you should try a smaller lambda to get a similar
 result in 1.3.

 Sean and Shuo, which approach do you prefer? Do you know any existing
 work discussing this?

 Best,
 Xiangrui


 On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote:
 This sounds like a bug ... Did you try a different lambda? It would be
 great if you can share your dataset or re-produce this issue on the
 public dataset. Thanks! -Xiangrui

 On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote:
 After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly
 smaller factors (and hence scores). For example, the first few product's
 factor values in 1.2.0 are (0.04821, -0.00674,  -0.0325). In 1.3.0, the
 first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This
 difference of several orders of magnitude is consistent throughout both 
 user
 and product. The recommendations from 1.2.0 are subjectively much better
 than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less
 memory.

 My first thought is that there is too much regularization in the 1.3.0
 results, but I'm using the same lambda parameter value. This is a snippet 
 of
 my scala code:
 .
 val rank = 75
 val numIterations = 15
 val alpha = 10
 val lambda = 0.01
 val model = ALS.trainImplicit(train_data, rank, numIterations,
 lambda=lambda, alpha=alpha)
 .

 The code and input data are identical across both versions. Did anything
 change between the two versions I'm not aware of? I'd appreciate any help!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
Hi Udit,

The persisted RDDs in memory is cleared by Spark using LRU policy and you
can also set the time to clear the persisted RDDs and meta-data by setting*
spark.cleaner.ttl *(default infinite). But I am not aware about any
properties to clean the older shuffle write from from disks.

thanks,
bijay

On Tue, Mar 31, 2015 at 1:50 PM, Udit Mehta ume...@groupon.com wrote:

 Thanks for the reply.
 This will reduce the shuffle write to disk to an extent but for a long
 running job(multiple days), the shuffle write would still occupy a lot of
 space on disk. Why do we need to store the data from older map tasks to
 memory?

 On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak bijay.pat...@cloudwick.com
 wrote:

 The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
 each Map tasks to memory until they they can't fit after which they
 are sorted and spilled to disk. You can reduce the Shuffle write to
 disk by increasing spark.shuffle.memoryFraction(default 0.2).

 By writing the shuffle output to disk the Spark lineage can be
 truncated when the RDDs are already materialized as the side-effects
 of earlier shuffle.This is the under the hood optimization in Spark
 which is only possible because of shuffle output output being written
 to disk.

 You can set spark.shuffle.spill to false if you don't want to spill to
 the disk and assuming you have enough heap memory.

 On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote:
  I have noticed a similar issue when using spark streaming. The spark
 shuffle
  write size increases to a large size(in GB) and then the app crashes
 saying:
  java.io.FileNotFoundException:
 
 /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
  (No such file or directory)
 
  I dont understand why the shuffle size increases to such a large value
 for
  long running jobs.
 
  Thanks,
  Udiy
 
  On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com
 wrote:
 
  Thanks Saisai. I will try your solution, but still i don't understand
 why
  filesystem should be used where there is a plenty of memory available!
 
 
 
  On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
  wrote:
 
  Shuffle write will finally spill the data into file system as a bunch
 of
  files. If you want to avoid disk write, you can mount a ramdisk and
  configure spark.local.dir to this ram disk. So shuffle output will
 write
  to memory based FS, and will not introduce disk IO.
 
  Thanks
  Jerry
 
  2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:
 
  Hi,
 
  I was looking at SparkUI, Executors, and I noticed that I have 597 MB
  for  Shuffle while I am using cached temp-table and the Spark had 2
 GB free
  memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
 
  Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks
 be
  done in memory?
 
  best,
 
  /Shahab
 
 
 
 





Re: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array

2015-03-31 Thread Todd Nist
So in looking at this a bit more, I gather the root cause is the fact that
the nested fields are represented as rows within rows, is that correct?  If
I don't know the size of the json array (it varies), using
x.getAs[Row](0).getString(0) is not really a valid solution.

Is the solution to apply a lateral view + explode to this?

I have attempted to change to a lateral view, but looks like my syntax is
off:

sqlContext.sql(
SELECT path,`timestamp`, name, value, pe.value FROM metric
 lateral view explode(pathElements) a AS pe)
.collect.foreach(println(_))
Which results in:

15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread main java.lang.RuntimeException: [1.68] failure:
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral
view explode(pathElements) a AS pe
   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Is this the right approach?  Is this syntax available in 1.2.1:

SELECT
  v1.name, v2.city, v2.state
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1
 as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
 as city, state;


-Todd

On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist tsind...@gmail.com wrote:

 I am accessing ElasticSearch via the elasticsearch-hadoop and attempting
 to expose it via SparkSQL. I am using spark 1.2.1, latest supported by
 elasticsearch-hadoop, and org.elasticsearch % elasticsearch-hadoop %
 2.1.0.BUILD-SNAPSHOT of elasticsearch-hadoop. I’m
 encountering an issue when I attempt to query the following json after
 creating a temporary table from it. The json looks like this:

 PUT /_template/device
 {
   template: dev*,
   

--driver-memory parameter doesn't work for spark-submmit on yarn?

2015-03-31 Thread Shuai Zheng
Hi All,

 

Below is the my shell script:

 

/home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G
--master yarn-client --class com.***.FinancialEngineExecutor
/home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties 

 

My driver will load some resources and then broadcast to all executors.

 

That resources is only 600MB in ser format, but I always has out of memory
exception, it looks like the driver doesn't allocate right memory to my
driver.

 

Exception in thread main java.lang.OutOfMemoryError: Java heap space

at java.lang.reflect.Array.newArray(Native Method)

at java.lang.reflect.Array.newInstance(Array.java:70)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

at
com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68)

 

Do I do anything wrong here? 

 

And no matter how much I set for --driver-memory value (from 512M to 20G),
it always give me error on the same line (that line try to load a 600MB java
serialization file). So looks like the script doesn't allocate right memory
to driver in my case?

 

Regards,

 

Shuai



Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-03-31 Thread Ted Yu
Jeetendra:
Please extract the information you need from Result and return the
extracted portion - instead of returning Result itself.

Cheers

On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

 The example in
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
  might
 help

 Best,

 --
 Nan Zhu
 http://codingcat.me

 On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote:

 Yep, it's not serializable:
 https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html

 You can't return this from a distributed operation since that would
 mean it has to travel over the network and you haven't supplied any
 way to convert the thing into bytes.

 On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 When I am trying to get the result from Hbase and running mapToPair
 function
 of RRD its giving the error
 java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

 Here is the code

 // private static JavaPairRDDInteger, Result
 getCompanyDataRDD(JavaSparkContext sc) throws IOException {
 // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
 TableInputFormat.class, ImmutableBytesWritable.class,
 // Result.class).mapToPair(new
 PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() {
 //
 // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable,
 Result t) throws Exception {
 // System.out.println(In getCompanyDataRDD+t._2);
 //
 // String cknid = Bytes.toString(t._1.get());
 // System.out.println(processing cknids is:+cknid);
 // Integer cknidInt = Integer.parseInt(cknid);
 // Tuple2Integer, Result returnTuple = new Tuple2Integer,
 Result(cknidInt, t._2);
 // return returnTuple;
 // }
 // });
 // }


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Query REST web service with Spark?

2015-03-31 Thread Minnow Noir
We have have some data on Hadoop that needs augmented with data only
available to us via a REST service.  We're using Spark to search for, and
correct, missing data. Even though there are a lot of records to scour for
missing data, the total number of calls to the service is expected to be
low, so it would be ideal to do the whole job in Spark as we scour the data.

I don't see anything obvious in the API or on Google relating to making
REST calls from a Spark job.  Is it possible?

Thanks,

Alec


spark.sql.Row manipulation

2015-03-31 Thread roni
I have 2 paraquet files with format e.g  name , age, town
I read them  and then join them to get  all the names which are in both
towns  .
the resultant dataset is

res4: Array[org.apache.spark.sql.Row] = Array([name1, age1,
town1,name2,age2,town2])

Name 1 and name 2 are same as I am joining .
Now , I want to get only to the format (name , age1, age2)

But I cant seem to getting to manipulate the spark.sql.row.

Trying something like map(_.split (,)).map(a= (a(0), a(1).trim().toInt))
does not work .

Can you suggest a way ?

Thanks
-R


Re: How to configure SparkUI to use internal ec2 ip

2015-03-31 Thread Anny Chen
Thanks Petar  and Akhil for the suggestion.

Actually I changed the SPARK_MASTER_IP to the internal-ip, deleted the
export SPARK_PUBLIC_DNS=xx line in the spark-env.sh and also edited
the /etc/hosts as Akhil suggested, and now it is working! However I don't
know which change actually makes it work.

Thanks!
Anny

On Tue, Mar 31, 2015 at 10:26 AM, Petar Zecevic petar.zece...@gmail.com
wrote:


 Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh?



 On 31.3.2015. 19:19, Anny Chen wrote:

 Hi Akhil,

  I tried editing the /etc/hosts on the master and on the workers, and
 seems it is not working for me.

  I tried adding hostname internal-ip and it didn't work. I then tried
 adding internal-ip hostname and it didn't work either. I guess I should
 also edit the spark-env.sh file?

  Thanks!
 Anny

 On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

  You can add an internal ip to public hostname mapping in your
 /etc/hosts file, if your forwarding is proper then it wouldn't be a problem
 there after.



  Thanks
 Best Regards

 On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote:

 Hi,

 For security reasons, we added a server between my aws Spark Cluster and
 local, so I couldn't connect to the cluster directly. To see the SparkUI
 and
 its related work's  stdout and stderr, I used dynamic forwarding and
 configured the SOCKS proxy. Now I could see the SparkUI using the
 internal
 ec2 ip, however when I click on the application UI (4040) or the
 worker's UI
 (8081), it still automatically uses the public DNS instead of internal
 ec2
 ip, which the browser now couldn't show.

 Is there a way that I could configure this? I saw that one could
 configure
 the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could
 help. Does anyone experience the same issue?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org