Parquet Hive table become very slow on 1.3?
Hi all, We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we find that, just a simple COUNT(*) query will much slower (100x) than Spark 1.2. I find the most time spent on driver to get HDFS blocks. I find large amount of get below logs printed: 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{ fileLength=77153436 underConstruction=false blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}] lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]} isLastBlockComplete=true} 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010 I compare the printed log with Spark 1.2, although the number of getBlockLocations call is similar, but each such operation only cost 20~30 ms (but it is 2000ms~3000ms now), and it didn't print the detailed LocatedBlocks info. Another finding is, if I read the Parquet file via scala code form spark-shell as below, it looks fine, the computation will return the result quick as before. sqlContext.parquetFile(data/myparquettable) Any idea about it? Thank you! -- 郑旭东 Zheng, Xudong
Re: How to configure SparkUI to use internal ec2 ip
You can add an internal ip to public hostname mapping in your /etc/hosts file, if your forwarding is proper then it wouldn't be a problem there after. Thanks Best Regards On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote: Hi, For security reasons, we added a server between my aws Spark Cluster and local, so I couldn't connect to the cluster directly. To see the SparkUI and its related work's stdout and stderr, I used dynamic forwarding and configured the SOCKS proxy. Now I could see the SparkUI using the internal ec2 ip, however when I click on the application UI (4040) or the worker's UI (8081), it still automatically uses the public DNS instead of internal ec2 ip, which the browser now couldn't show. Is there a way that I could configure this? I saw that one could configure the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could help. Does anyone experience the same issue? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: different result from implicit ALS with explicit ALS
I have update my spark source code to 1.3.1. the checkpoint works well. BUT the shuffle data still could not be delete automatically…the disk usage is still 30TB… I have set the spark.cleaner.referenceTracking.blocking.shuffle to true. Do you know how to solve my problem? Sendong Li 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com 写道: setCheckpointInterval was added in the current master and branch-1.3. Please help check whether it works. It will be included in the 1.3.1 and 1.4.0 release. -Xiangrui On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: hi, xiangrui: I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS: the code is : https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala PastedGraphic-2.tiff the checkpoint is very important in my situation, because my task will produce 1TB shuffle data in each iteration, it the shuffle data is not deleted in each iteration(using checkpoint()), the task will produce 30TB data… So I change the ALS code, and re-compile by myself, but it seems the checkpoint does not take effects, and the task still occupy 30TB disk… ( I only add two lines to the ALS.scala) : PastedGraphic-3.tiff and the driver’s log seems strange, why the log is printed together... PastedGraphic-1.tiff thank you very much! 在 2015年2月26日,下午11:33,163 lisend...@163.com mailto:lisend...@163.com 写道: Thank you very much for your opinion:) In our case, maybe it 's dangerous to treat un-observed item as negative interaction(although we could give them small confidence, I think they are still incredible...) I will do more experiments and give you feedback:) Thank you;) 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com mailto:so...@cloudera.com 写道: I believe that's right, and is what I was getting at. yes the implicit formulation ends up implicitly including every possible interaction in its loss function, even unobserved ones. That could be the difference. This is mostly an academic question though. In practice, you have click-like data and should be using the implicit version for sure. However you can give negative implicit feedback to the model. You could consider no-click as a mild, observed, negative interaction. That is: supply a small negative value for these cases. Unobserved pairs are not part of the data set. I'd be careful about assuming the lack of an action carries signal. On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com mailto:lisend...@163.com wrote: oh my god, I think I understood... In my case, there are three kinds of user-item pairs: Display and click pair(positive pair) Display but no-click pair(negative pair) No-display pair(unobserved pair) Explicit ALS only consider the first and the second kinds But implicit ALS consider all the three kinds of pair(and consider the third kind as the second pair, because their preference value are all zero and confidence are all 1) So the result are different. right? Could you please give me some advice, which ALS should I use? If I use the implicit ALS, how to distinguish the second and the third kind of pair:) My opinion is in my case, I should use explicit ALS ... Thank you so much 在 2015年2月26日,22:41,Xiangrui Meng m...@databricks.com mailto:m...@databricks.com 写道: Lisen, did you use all m-by-n pairs during training? Implicit model penalizes unobserved ratings, while explicit model doesn't. -Xiangrui On Feb 26, 2015 6:26 AM, Sean Owen so...@cloudera.com mailto:so...@cloudera.com wrote: +user On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen so...@cloudera.com mailto:so...@cloudera.com wrote: I think I may have it backwards, and that you are correct to keep the 0 elements in train() in order to try to reproduce the same result. The second formulation is called 'weighted regularization' and is used for both implicit and explicit feedback, as far as I can see in the code. Hm, I'm actually not clear why these would produce different results. Different code paths are used to be sure, but I'm not yet sure why they would give different results. In general you wouldn't use train() for data like this though, and would never set alpha=0. On Thu, Feb 26, 2015 at 2:15 PM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: I want to confirm the loss function you use (sorry I’m not so familiar with scala code so I did not understand the source code of mllib) According to the papers : in your implicit feedback ALS, the loss function is (ICDM 2008): in the explicit feedback ALS, the loss function is (Netflix 2008): note that besides the difference of confidence
Re: JettyUtils.createServletHandler Method not Found?
Yes, this private is checked at compile time and my class is in a subpackage of org.apache.spark.ui, so the visibility is not the issue, or at least not as far as I can tell. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JettyUtils-createServletHandler-Method-not-Found-tp22262p22313.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error in Delete Table
Which Spark and Hive release are you using ? Thanks On Mar 27, 2015, at 2:45 AM, Masf masfwo...@gmail.com wrote: Hi. In HiveContext, when I put this statement DROP TABLE IF EXISTS TestTable If TestTable doesn't exist, spark returns an error: ERROR Hive: NoSuchObjectException(message:default.TestTable table not found) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy22.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942) at org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at GeoMain$.HiveExecution(GeoMain.scala:96) at GeoMain$.main(GeoMain.scala:17) at GeoMain.main(GeoMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks!! -- Regards. Miguel Ángel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: log4j.properties in jar
Hello Udit, Yes, what you ask is possible. If you follow the Spark documentation and tutorial about how to build stand-alone applications, you can see that it is possible to build a stand-alone, über-JAR file that includes everything. For example, if you want to suppress some messages by modifying log4j in unit tests, you can do the following: http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736 Hope this helps. -- Emre Sevinç http://www.bigindustries.be/ On Mon, Mar 30, 2015 at 10:24 PM, Udit Mehta ume...@groupon.com wrote: Hi, Is it possible to put the log4j.properties in the application jar such that the driver and the executors use this log4j file. Do I need to specify anything while submitting my app so that this file is used? Thanks, Udit -- Emre Sevinc
Re: Some questions after playing a little with the new ml.Pipeline.
My guess is that the `createDataFrame` call is failing here. Can you check if the schema being passed to it includes the column name and type for the newly being zipped `features` ? Joseph probably knows this better, but AFAIK the DenseVector here will need to be marked as a VectorUDT while creating a DataFrame column Thanks Shivaram On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Following your suggestion, I end up with the following implementation : *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows = Caffe.set_mode(Caffe.CPU)val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight)) val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1 val K: Int = inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])= dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(sOutput dimension $resultDim) val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble))} //net.deallocate()feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD. Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take
Re: Error in Delete Table
Hi Ted. Spark 1.2.0 an Hive 0.13.1 Regards. Miguel Angel. On Tue, Mar 31, 2015 at 10:37 AM, Ted Yu yuzhih...@gmail.com wrote: Which Spark and Hive release are you using ? Thanks On Mar 27, 2015, at 2:45 AM, Masf masfwo...@gmail.com wrote: Hi. In HiveContext, when I put this statement DROP TABLE IF EXISTS TestTable If TestTable doesn't exist, spark returns an error: ERROR Hive: NoSuchObjectException(message:default.TestTable table not found) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy22.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942) at org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at GeoMain$.HiveExecution(GeoMain.scala:96) at GeoMain$.main(GeoMain.scala:17) at GeoMain.main(GeoMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks!! -- Regards. Miguel Ángel -- Saludos. Miguel Ángel
Re: different result from implicit ALS with explicit ALS
Thank you, @GuoQiang I will try to add runGC() to the ALS.scala, and if it works for deleting the shuffle data, I will tell you :-) ?? 2015??3??314:47??GuoQiang Li wi...@qq.com ?? You can try to enforce garbage collection: /** Run GC and make sure it actually has run */ def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. // Wait until a weak reference object has been GCed System.runFinalization() while (weakRef.get != null) { System.gc() System.runFinalization() Thread.sleep(200) if (System.currentTimeMillis - startTime 1) { throw new Exception(automatically cleanup error) } } } -- -- ??: lisendonglisend...@163.com mailto:lisend...@163.com; : 2015??3??31??(??) 3:47 ??: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; : Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; useruser@spark.apache.org mailto:user@spark.apache.org; Sean Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang Liwi...@qq.com mailto:wi...@qq.com; : Re: different result from implicit ALS with explicit ALS I have update my spark source code to 1.3.1. the checkpoint works well. BUT the shuffle data still could not be delete automatically??the disk usage is still 30TB?? I have set the spark.cleaner.referenceTracking.blocking.shuffle to true. Do you know how to solve my problem? Sendong Li ?? 2015??3??3112:11??Xiangrui Meng men...@gmail.com mailto:men...@gmail.com ?? setCheckpointInterval was added in the current master and branch-1.3. Please help check whether it works. It will be included in the 1.3.1 and 1.4.0 release. -Xiangrui On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: hi, xiangrui: I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS: the code is : https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala PastedGraphic-2.tiff the checkpoint is very important in my situation, because my task will produce 1TB shuffle data in each iteration, it the shuffle data is not deleted in each iteration(using checkpoint()), the task will produce 30TB data?? So I change the ALS code, and re-compile by myself, but it seems the checkpoint does not take effects, and the task still occupy 30TB disk?? ( I only add two lines to the ALS.scala) : PastedGraphic-3.tiff and the driver??s log seems strange, why the log is printed together... PastedGraphic-1.tiff thank you very much! ?? 2015??2??2611:33??163 lisend...@163.com mailto:lisend...@163.com ?? Thank you very much for your opinion:) In our case, maybe it 's dangerous to treat un-observed item as negative interaction(although we could give them small confidence, I think they are still incredible...) I will do more experiments and give you feedback:) Thank you;) ?? 2015??2??2623:16??Sean Owen so...@cloudera.com mailto:so...@cloudera.com ?? I believe that's right, and is what I was getting at. yes the implicit formulation ends up implicitly including every possible interaction in its loss function, even unobserved ones. That could be the difference. This is mostly an academic question though. In practice, you have click-like data and should be using the implicit version for sure. However you can give negative implicit feedback to the model. You could consider no-click as a mild, observed, negative interaction. That is: supply a small negative value for these cases. Unobserved pairs are not part of the data set. I'd be careful about assuming the lack of an action carries signal. On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com mailto:lisend...@163.com wrote: oh my god, I think I understood... In my case, there are three kinds of user-item pairs: Display and click pair(positive pair) Display but no-click pair(negative pair) No-display pair(unobserved pair) Explicit ALS only consider the first and the second kinds But implicit ALS consider all the three kinds of pair(and consider the third kind as the second pair, because their preference value are all zero and confidence are all 1) So the result are different. right? Could you please give me some advice, which ALS should I use? If I use the implicit ALS, how to distinguish the second and the third kind of pair:) My opinion is in my case, I should use explicit ALS ... Thank you so much ?? 2015??2??2622:41??Xiangrui Meng
workers no route to host
Hi,i set up a standalone cluster of 5 machines(tmaster, tslave1,2,3,4) with spark-1.3.0-cdh5.4.0-snapshort. when i execute the sbin/start-all.sh, the master is ok, but i cant see the web ui. Moreover, the worker logs is something like this: Spark assembly has been built with Hive, including Datanucleus jars on classpath/data/PlatformDep/cdh5/dist/bin/compute-classpath.sh: line 164: hadoop: command not foundSpark Command: java -cp :/data/PlatformDep/cdh5/dist/sbin/../conf:/data/PlatformDep/cdh5/dist/lib/spark-assembly-1.3.0-cdh5.4.0-SNAPSHOT-hadoop2.6.0-cdh5.4.0-SNAPSHOT.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-rdbms-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-api-jdo-3.2.1.jar:/data/PlatformDep/cdh5/dist/lib/datanucleus-core-3.2.2.jar: -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.128.16:7071 --webui-port 8081 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties15/03/31 06:47:22 INFO Worker: Registered signal handlers for [TERM, HUP, INT]15/03/31 06:47:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/03/31 06:47:23 INFO SecurityManager: Changing view acls to: dcadmin15/03/31 06:47:23 INFO SecurityManager: Changing modify acls to: dcadmin15/03/31 06:47:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dcadmin); users with modify permissions: Set(dcadmin)15/03/31 06:47:23 INFO Slf4jLogger: Slf4jLogger started15/03/31 06:47:23 INFO Remoting: Starting remoting15/03/31 06:47:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@tslave2:60815]15/03/31 06:47:24 INFO Utils: Successfully started service 'sparkWorker' on port 60815.15/03/31 06:47:24 INFO Worker: Starting Spark worker tslave2:60815 with 2 cores, 3.0 GB RAM15/03/31 06:47:24 INFO Worker: Running Spark version 1.3.015/03/31 06:47:24 INFO Worker: Spark home: /data/PlatformDep/cdh5/dist15/03/31 06:47:24 INFO Server: jetty-8.y.z-SNAPSHOT15/03/31 06:47:24 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:808115/03/31 06:47:24 INFO Utils: Successfully started service 'WorkerUI' on port 8081.15/03/31 06:47:24 INFO WorkerWebUI: Started WorkerWebUI at http://tslave2:808115/03/31 06:47:24 INFO Worker: Connecting to master akka.tcp://sparkMaster@192.168.128.16:7071/user/Master...15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: No route to host]15/03/31 06:47:24 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@tslave2:60815] - [akka.tcp://sparkMaster@192.168.128.16:7071]: Error [Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.128.16:7071] the worker machines ping the master machine successfully. the hosts is like this:192.168.128.16 tmaster tmaster192.168.128.17 tslave1 tslave1192.168.128.18 tslave2 tslave2192.168.128.19 tslave3 tslave3192.168.128.20 tslave4 tslave4 Hope someone could help. Thanks
Re: Some questions after playing a little with the new ml.Pipeline.
Following your suggestion, I end up with the following implementation : *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows = Caffe.set_mode(Caffe.CPU)val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight))val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1val K: Int = inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height()val W: Int = inputBlobs.get(0).width() inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])= dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(sOutput dimension $resultDim) val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble))}//net.deallocate() feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD. Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol), deepCNNFeature(col(map(inputCol* * }* *}* where CaffeModel is a java api to Caffe C++ model. The problem here is that for every row it will create a new instance of CaffeModel which is inefficient since creating a new model means loading a large model file. And it will transform only a single row at a time. Or a Caffe network can process a batch of rows efficiently. In other words, is it possible to create an UDF that can operatats on a partition in order to minimize the creation of a CaffeModel and to take advantage of the Caffe network batch processing ? On Tue, Mar 3, 2015 at 7:26 AM, Joseph Bradley jos...@databricks.com wrote: I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like
Re: Some questions after playing a little with the new ml.Pipeline.
In my transformSchema I do specify that the output column type is a VectorUDT : *override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = this.paramMap ++ paramMap checkInputColumn(schema, map(inputCol), ArrayType(FloatType, false)) addOutputColumn(schema, map(outputCol), new VectorUDT)}* The output of printSchema is as follow : *|-- cnnFeature: vecto (nullable = false)* On Tue, Mar 31, 2015 at 9:55 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: My guess is that the `createDataFrame` call is failing here. Can you check if the schema being passed to it includes the column name and type for the newly being zipped `features` ? Joseph probably knows this better, but AFAIK the DenseVector here will need to be marked as a VectorUDT while creating a DataFrame column Thanks Shivaram On Tue, Mar 31, 2015 at 12:50 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Following your suggestion, I end up with the following implementation : *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = { val schema = transformSchema(dataSet.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap* *val features = dataSet.select(map(inputCol)).mapPartitions { rows = Caffe.set_mode(Caffe.CPU)val net = CaffeUtils.floatTestNetwork(SparkFiles.get(topology), SparkFiles.get(weight))val inputBlobs: FloatBlobVector = net.input_blobs()val N: Int = 1val K: Int = inputBlobs.get(0).channels()val H: Int = inputBlobs.get(0).height() val W: Int = inputBlobs.get(0).width()inputBlobs.get(0).Reshape(N, K, H, W)val dataBlob = new FloatPointer(N*K*W*H)* val inputCPUData = inputBlobs.get(0).mutable_cpu_data() val feat = rows.map { case Row(a: Iterable[Float])= dataBlob.put(a.toArray, 0, a.size) caffe_copy_float(N*K*W*H, dataBlob, inputCPUData) val resultBlobs: FloatBlobVector = net.ForwardPrefilled() * val resultDim = resultBlobs.get(0).channels() logInfo(sOutput dimension $resultDim) val resultBlobData = resultBlobs.get(0).cpu_data() val output = new Array[Float](resultDim) resultBlobData.get(output) Vectors.dense(output.map(_.toDouble)) }//net.deallocate()feat } val newRowData = dataSet.rdd.zip(features).map { case (old, feat)=val oldSeq = old.toSeq Row.fromSeq(oldSeq :+ feat) } dataSet.sqlContext.createDataFrame(newRowData, schema)}* The idea is to mapPartitions of the underlying RDD of the DataFrame and create a new DataFrame by zipping the results. It seems to work but when I try to save the RDD I got the following error : org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row On Mon, Mar 30, 2015 at 6:40 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: One workaround could be to convert a DataFrame into a RDD inside the transform function and then use mapPartitions/broadcast to work with the JNI calls and then convert back to RDD. Thanks Shivaram On Mon, Mar 30, 2015 at 8:37 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, I'm still struggling to make a pre-trained caffe model transformer for dataframe works. The main problem is that creating a caffe model inside the UDF is very slow and consumes memories. Some of you suggest to broadcast the model. The problem with broadcasting is that I use a JNI interface to caffe C++ with javacpp-preset and it is not serializable. Another possible approach is to use a UDF that can handle a whole partitions instead of just a row in order to minimize the caffe model instantiation. Is there any ideas to solve one of these two issues ? Best, Jao On Tue, Mar 3, 2015 at 10:04 PM, Joseph Bradley jos...@databricks.com wrote: I see. I think your best bet is to create the cnnModel on the master and then serialize it to send to the workers. If it's big (1M or so), then you can broadcast it and use the broadcast variable in the UDF. There is not a great way to do something equivalent to mapPartitions with UDFs right now. On Tue, Mar 3, 2015 at 4:36 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Here is my current implementation with current master version of spark *class DeepCNNFeature extends Transformer with HasInputCol with HasOutputCol ... { override def transformSchema(...) { ... }* *override def transform(dataSet: DataFrame, paramMap: ParamMap): DataFrame = {* * transformSchema(dataSet.schema, paramMap, logging = true)* * val map = this.paramMap ++ paramMap val deepCNNFeature = udf((v: Vector)= {* * val cnnModel = new CaffeModel * * cnnModel.transform(v)* * } : Vector ) dataSet.withColumn(map(outputCol),
Unable to save dataframe with UDT created with sqlContext.createDataFrame
Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : *object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME)val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g)val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc)import sqlContext.implicits._val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10val dataDF = data.toDFdataDF.save(test1.parquet)val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) }}* Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao
Re: spark there is no space on the disk
Yes, we have just modified the configuration, and every thing works fine. Thanks very much for the help. On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote: For YARN, possibly this one ? property nameyarn.nodemanager.local-dirs/name value/hadoop/yarn/local/value /property Cheers On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com wrote: IIRC you have to set that configuration on the Worker processes (for standalone). The app can't override it (only for a client-mode driver). YARN has a similar configuration, but I don't know the name (shouldn't be hard to find, though). On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com wrote: Is it possible that `spark.local.dir` is overriden by others? The docs say: NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
Re: Can't run spark-submit with an application jar on a Mesos cluster
Well that are only the logs of the slaves on mesos level, I'm not sure from your reply if you can ssh into a specific slave or not, if you can, you should look at actual output of the application (spark in this case) on a slave in e.g. /tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/4/runs/e3cf195d-525b-4148-aa38-1789d378a948/std{err,out} actual UUIDs, run number (in this example '4') in the path can differ from slave-node to slave-node. look into those stderr and stdout files and you'll probably have your answer why it is failing. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22319.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming with Kafka, multiple partitions fail, single partition ok
Hello, @Akhil Das I'm trying to use the experimental API https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scalasa=Dsntz=1usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw. I'm reusing the same code snippet to initialize my topicSet. @Cody Koeninger I don't see any previous error messages (see the full log at the end). To create the topic, I'm doing the following : kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic toto kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic toto-single I'm launching my Spark Streaming in local mode. @Ted Yu There's no log Couldn't connect to leader for topic, here's the full version : spark-submit --conf config.resource=application-integration.conf --class nextgen.Main assembly-0.1-SNAPSHOT.jar 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nphung); users with modify permissions: Set(nphung) 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started 15/03/31 10:47:13 INFO Remoting: Starting remoting 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on port 44180. 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150331104713-2238 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' on port 50204. 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040 15/03/31 10:47:16 INFO SparkContext: Added JAR file:/home/nphung/assembly-0.1-SNAPSHOT.jar at http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 1427791636151 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager localhost:40630 with 265.1 MB RAM, BlockManagerId(driver, localhost, 40630) 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@7fd8c559 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval
Broadcasting a parquet file using spark and python
How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: refer to dictionary
You can use broadcast variable. See also this thread: http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+ On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi, I have a RDD (rdd1)where each line is split into an array [a, b, c], etc. And I also have a local dictionary p (dict1) stores key value pair {a:1, b: 2, c:3} I want to replace the keys in the rdd with the its corresponding value in the dict: rdd1.map(lambda line: [dict1[item] for item in line]) But this task is not distributed, I believe the reason is the dict1 is a local instance. Can any one provide suggestions on this to parallelize this? Thanks, Best, Peng - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0
I had always understood the formulation to be the first option you describe. Lambda is scaled by the number of items the user has rated / interacted with. I think the goal is to avoid fitting the tastes of prolific users disproportionately just because they have many ratings to fit. This is what's described in the ALS-WR paper we link to on the Spark web site, in equation 5 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf) I think this also gets you the scale-invariance? For every additional rating from user i to product j, you add one new term to the squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the regularization term by lambda * (|u_i|^2 + |m_j|^2) They are at least both increasing about linearly as ratings increase. If the regularization term is multiplied by the total number of users and products in the model, then it's fixed. I might misunderstand you and/or be speaking about something slightly different when it comes to invariance. But FWIW I had always understood the regularization to be multiplied by the number of explicit ratings. On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote: Okay, I didn't realize that I changed the behavior of lambda in 1.3. to make it scale-invariant, but it is worth discussing whether this is a good change. In 1.2, we multiply lambda by the number ratings in each sub-problem. This makes it scale-invariant for explicit feedback. However, in implicit feedback model, a user's sub-problem contains all item factors. Then the question is whether we should multiply lambda by the number of explicit ratings from this user or by the total number of items. We used the former in 1.2 but changed to the latter in 1.3. So you should try a smaller lambda to get a similar result in 1.3. Sean and Shuo, which approach do you prefer? Do you know any existing work discussing this? Best, Xiangrui On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote: This sounds like a bug ... Did you try a different lambda? It would be great if you can share your dataset or re-produce this issue on the public dataset. Thanks! -Xiangrui On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote: After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly smaller factors (and hence scores). For example, the first few product's factor values in 1.2.0 are (0.04821, -0.00674, -0.0325). In 1.3.0, the first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This difference of several orders of magnitude is consistent throughout both user and product. The recommendations from 1.2.0 are subjectively much better than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less memory. My first thought is that there is too much regularization in the 1.3.0 results, but I'm using the same lambda parameter value. This is a snippet of my scala code: . val rank = 75 val numIterations = 15 val alpha = 10 val lambda = 0.01 val model = ALS.trainImplicit(train_data, rank, numIterations, lambda=lambda, alpha=alpha) . The code and input data are identical across both versions. Did anything change between the two versions I'm not aware of? I'd appreciate any help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array
You can use the HiveContext instead of SQLContext, which should support all the HiveQL, including lateral view explode. SQLContext is not supporting that yet. BTW, nice coding format in the email. Yong Date: Tue, 31 Mar 2015 18:18:19 -0400 Subject: Re: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array From: tsind...@gmail.com To: user@spark.apache.org So in looking at this a bit more, I gather the root cause is the fact that the nested fields are represented as rows within rows, is that correct? If I don't know the size of the json array (it varies), using x.getAs[Row](0).getString(0) is not really a valid solution. Is the solution to apply a lateral view + explode to this? I have attempted to change to a lateral view, but looks like my syntax is off: sqlContext.sql( SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe) .collect.foreach(println(_)) Which results in: 15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0 Exception in thread main java.lang.RuntimeException: [1.68] failure: ``UNION'' expected but identifier view found SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Is this the right approach? Is this syntax available in 1.2.1: SELECT v1.name, v2.city, v2.state FROM people LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 as name, address LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2 as city, state; -Todd On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist tsind...@gmail.com wrote: I am accessing ElasticSearch via the
Re: Can't run spark-submit with an application jar on a Mesos cluster
Thanks hbogert. There it is plain as day; it can't find my spark binaries. I thought it was enough to set SPARK_EXECUTOR_URI in my spark-env.sh since this is all that's necessary to run spark-shell.sh against a mesos master, but I also had to set spark.executor.uri in my spark-defaults.conf (or in my app itself). Thanks again for your help to troubleshoot this problem. jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$ cat stderr I0329 20:34:26.107267 10026 exec.cpp:132] Version: 0.21.1 I0329 20:34:26.109591 10031 exec.cpp:206] Executor registered on slave 20150322-040336-606645514-5050-2744-S1 sh: 1: /home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class: not found jclouds@development-5159-d3d:/tmp/mesos/slaves/20150322-040336-606645514-5050-2744-S1/frameworks/20150322-040336-606645514-5050-2744-0037/executors/1/runs/latest$ cat stdout Registered executor on 10.217.7.180 Starting task 1 Forked command at 10036 sh -c ' /home/jclouds/spark-1.3.0-bin-hadoop2.4/bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@development-5159-d9.c.learning-spark.internal:54746/user/CoarseGrainedScheduler --executor-id 20150322-040336-606645514-5050-2744-S1 --hostname 10.217.7.180 --cores 10 --app-id 20150322-040336-606645514-5050-2744-0037' Command exited with status 127 (pid: 10036) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-run-spark-submit-with-an-application-jar-on-a-Mesos-cluster-tp22277p22331.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL saveAsParquet failed after a few waves
Hi, I am using spark-1.3 prebuilt release with hadoop2.4 support and Hadoop 2.4.0. I wrote a spark application(LoadApp) to generate data in each task and load the data into HDFS as parquet Files (use “saveAsParquet()” in spark sql) When few waves (1 or 2) are used in a job, LoadApp could finish after a few failures and retries. But when more waves (3) are involved in a job, the job would terminate abnormally. All the failures I faced with is: “java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN and the stacktraces are: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137) at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129) at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173) at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152) at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112) at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:634) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:648) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I have no idea what happened since jobs may fail or success without any reason. Thanks. Yijie Shen
Re: Actor not found
Thanks for the log. It's really helpful. I created a JIRA to explain why it will happen: https://issues.apache.org/jira/browse/SPARK-6640 However, will this error always happens in your environment? Best Regards, Shixiong Zhu 2015-03-31 22:36 GMT+08:00 sparkdi shopaddr1...@dubna.us: This is the whole output from the shell: ~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell Spark assembly has been built with Hive, including Datanucleus jars on classpath log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root 15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root 15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view pe rmissions: Set(root); users with modify permissions: Set(root) 15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server 15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:40 INFO AbstractConnector: Started SocketConnector@0.0.0.0:47797 15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class server' on port 47797. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.0 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. 15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0 15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root 15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root 15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view pe rmissions: Set(root); users with modify permissions: Set(root) 15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started 15/03/30 19:00:42 INFO Remoting: Starting remoting 15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@vm:52574] 15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on port 52574. 15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker 15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster 15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at /tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5 15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea 15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:43 INFO AbstractConnector: Started SocketConnector@0.0.0.0:48500 15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file server' on port 48500. 15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:43 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040 15/03/30 19:00:43 INFO Executor: Starting executor ID driver on host localhost 15/03/30 19:00:43 INFO Executor: Using REPL class URI: http://10.11.204.80:47797 15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@vm:5 2574/user/HeartbeatReceiver 15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for: ActorSelection[Anchor(akka://sparkDriver/deadLetters), Path(/)] akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by:
Anatomy of RDD : Deep dive into RDD data structure
Hi, Recently I gave a talk on RDD data structure which gives in depth understanding of spark internals. You can watch it on youtube https://www.youtube.com/watch?v=WVdyuVwWcBc. Also slides are on slideshare http://www.slideshare.net/datamantra/anatomy-of-rdd and code is on github https://github.com/phatak-dev/anatomy-of-rdd. Regards, Madhukara Phatak http://datamantra.io/
Re: different result from implicit ALS with explicit ALS
In my experiment, if I do not call gc() explicitly, the shuffle files will not be cleaned until the whole job finish… I don’t know why, maybe the rdd could not be GCed implicitly. In my situation, a full gc in driver takes about 10 seconds, so I start a thread in driver to do GC like this : (do GC every 120 seconds) while (true) { System.gc(); Thread.sleep(120 * 1000); } it works well now. Do you have more elegant ways to clean the shuffle files? Best Regards, Sendong Li 在 2015年4月1日,上午5:09,Xiangrui Meng men...@gmail.com 写道: Hey Guoqiang and Sendong, Could you comment on the overhead of calling gc() explicitly? The shuffle files should get cleaned in a few seconds after checkpointing, but it is certainly possible to accumulates TBs of files in a few seconds. In this case, calling gc() may work the same as waiting for a few seconds after each checkpoint. Is it correct? Best, Xiangrui On Tue, Mar 31, 2015 at 8:58 AM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: guoqiang ’s method works very well … it only takes 1TB disk now. thank you very much! 在 2015年3月31日,下午4:47,GuoQiang Li wi...@qq.com mailto:wi...@qq.com 写道: You can try to enforce garbage collection: /** Run GC and make sure it actually has run */ def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. // Wait until a weak reference object has been GCed System.runFinalization() while (weakRef.get != null) { System.gc() System.runFinalization() Thread.sleep(200) if (System.currentTimeMillis - startTime 1) { throw new Exception(automatically cleanup error) } } } -- 原始邮件 -- 发件人: lisendonglisend...@163.com mailto:lisend...@163.com; 发送时间: 2015年3月31日(星期二) 下午3:47 收件人: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; 抄送: Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; useruser@spark.apache.org mailto:user@spark.apache.org; Sean Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang Liwi...@qq.com mailto:wi...@qq.com; 主题: Re: different result from implicit ALS with explicit ALS I have update my spark source code to 1.3.1. the checkpoint works well. BUT the shuffle data still could not be delete automatically…the disk usage is still 30TB… I have set the spark.cleaner.referenceTracking.blocking.shuffle to true. Do you know how to solve my problem? Sendong Li 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com mailto:men...@gmail.com 写道: setCheckpointInterval was added in the current master and branch-1.3. Please help check whether it works. It will be included in the 1.3.1 and 1.4.0 release. -Xiangrui On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: hi, xiangrui: I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS: the code is : https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala PastedGraphic-2.tiff the checkpoint is very important in my situation, because my task will produce 1TB shuffle data in each iteration, it the shuffle data is not deleted in each iteration(using checkpoint()), the task will produce 30TB data… So I change the ALS code, and re-compile by myself, but it seems the checkpoint does not take effects, and the task still occupy 30TB disk… ( I only add two lines to the ALS.scala) : PastedGraphic-3.tiff and the driver’s log seems strange, why the log is printed together... PastedGraphic-1.tiff thank you very much! 在 2015年2月26日,下午11:33,163 lisend...@163.com mailto:lisend...@163.com 写道: Thank you very much for your opinion:) In our case, maybe it 's dangerous to treat un-observed item as negative interaction(although we could give them small confidence, I think they are still incredible...) I will do more experiments and give you feedback:) Thank you;) 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com mailto:so...@cloudera.com 写道: I believe that's right, and is what I was getting at. yes the implicit formulation ends up implicitly including every possible interaction in its loss function, even unobserved ones. That could be the difference. This is mostly an academic question though. In practice, you have click-like data and should be using the implicit version for sure. However you can give negative implicit feedback to the model. You could consider no-click as a mild, observed, negative interaction. That is: supply a small negative value for these cases. Unobserved pairs are not part of the data set.
Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0
I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have a clear answer about how the scaling should be handled. Maybe the best solution for now is to switch back to the 1.2 scaling. -Xiangrui On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen so...@cloudera.com wrote: Ah yeah I take your point. The squared error term is over the whole user-item matrix, technically, in the implicit case. I suppose I am used to assuming that the 0 terms in this matrix are weighted so much less (because alpha is usually large-ish) that they're almost not there, but they are. So I had just used the explicit formulation. I suppose the result is kind of scale invariant, but not exactly. I had not prioritized this property since I had generally built models on the full data set and not a sample, and had assumed that lambda would need to be retuned over time as the input grew anyway. So, basically I don't know anything more than you do, sorry! On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng men...@gmail.com wrote: Hey Sean, That is true for explicit model, but not for implicit. The ALS-WR paper doesn't cover the implicit model. In implicit formulation, a sub-problem (for v_j) is: min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2 This is a sum for all i but not just the users who rate item j. In this case, if we set X=m_j, the number of observed ratings for item j, it is not really scale invariant. We have #users user vectors in the least squares problem but only penalize lambda * #ratings. I was suggesting using lambda * m directly for implicit model to match the number of vectors in the least squares problem. Well, this is my theory. I don't find any public work about it. Best, Xiangrui On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com wrote: I had always understood the formulation to be the first option you describe. Lambda is scaled by the number of items the user has rated / interacted with. I think the goal is to avoid fitting the tastes of prolific users disproportionately just because they have many ratings to fit. This is what's described in the ALS-WR paper we link to on the Spark web site, in equation 5 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf) I think this also gets you the scale-invariance? For every additional rating from user i to product j, you add one new term to the squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the regularization term by lambda * (|u_i|^2 + |m_j|^2) They are at least both increasing about linearly as ratings increase. If the regularization term is multiplied by the total number of users and products in the model, then it's fixed. I might misunderstand you and/or be speaking about something slightly different when it comes to invariance. But FWIW I had always understood the regularization to be multiplied by the number of explicit ratings. On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote: Okay, I didn't realize that I changed the behavior of lambda in 1.3. to make it scale-invariant, but it is worth discussing whether this is a good change. In 1.2, we multiply lambda by the number ratings in each sub-problem. This makes it scale-invariant for explicit feedback. However, in implicit feedback model, a user's sub-problem contains all item factors. Then the question is whether we should multiply lambda by the number of explicit ratings from this user or by the total number of items. We used the former in 1.2 but changed to the latter in 1.3. So you should try a smaller lambda to get a similar result in 1.3. Sean and Shuo, which approach do you prefer? Do you know any existing work discussing this? Best, Xiangrui On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote: This sounds like a bug ... Did you try a different lambda? It would be great if you can share your dataset or re-produce this issue on the public dataset. Thanks! -Xiangrui On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote: After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly smaller factors (and hence scores). For example, the first few product's factor values in 1.2.0 are (0.04821, -0.00674, -0.0325). In 1.3.0, the first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This difference of several orders of magnitude is consistent throughout both user and product. The recommendations from 1.2.0 are subjectively much better than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less memory. My first thought is that there is too much regularization in the 1.3.0 results, but I'm using the same lambda parameter value. This is a snippet of my scala code: . val rank = 75 val numIterations = 15 val alpha = 10 val lambda = 0.01 val model = ALS.trainImplicit(train_data, rank, numIterations,
Re: Broadcasting a parquet file using spark and python
Hi Michael, Thanks for your response. I am running 1.2.1. Is there any workaround to achieve the same with 1.2.1? Thanks, Jitesh On Wed, Apr 1, 2015 at 12:25 AM, Michael Armbrust mich...@databricks.com wrote: In Spark 1.3 I would expect this to happen automatically when the parquet table is small ( 10mb, configurable with spark.sql.autoBroadcastJoinThreshold). If you are running 1.3 and not seeing this, can you show the code you are using to create the table? On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 jitesh...@gmail.com wrote: How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.sql.Row manipulation
You can do something like: df.collect().map { case Row(name: String, age1: Int, age2: Int) = ... } On Tue, Mar 31, 2015 at 4:05 PM, roni roni.epi...@gmail.com wrote: I have 2 paraquet files with format e.g name , age, town I read them and then join them to get all the names which are in both towns . the resultant dataset is res4: Array[org.apache.spark.sql.Row] = Array([name1, age1, town1,name2,age2,town2]) Name 1 and name 2 are same as I am joining . Now , I want to get only to the format (name , age1, age2) But I cant seem to getting to manipulate the spark.sql.row. Trying something like map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) does not work . Can you suggest a way ? Thanks -R
Minimum slots assigment to Spark on Mesos
Hi All, I am running Spark MR on Mesos. Is there a configuration setting for Spark to define the minimum required slots (similar to MapReduce's mapred.mesos.total.reduce.slots.minimum and mapred.mesos.total.map.slots. minimum)? The most related property I see is this: spark.scheduler. minRegisteredResourcesRatio found on the documentation here: http://spark.apache.org/docs/1.2.1/configuration.html#spark-properties What I wanted to have is a fixed amount of trackers assigned for Spark so I can share my cluster with MR. Any suggestion on parts of code or documentation that I should check for a full list of available configurations? thanks, Stratos
Re: Spark streaming with Kafka, multiple partitions fail, single partition ok
Can you show us the output of DStream#print() if you have it ? Thanks On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, @Akhil Das I'm trying to use the experimental API https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scalasa=Dsntz=1usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw. I'm reusing the same code snippet to initialize my topicSet. @Cody Koeninger I don't see any previous error messages (see the full log at the end). To create the topic, I'm doing the following : kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic toto kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic toto-single I'm launching my Spark Streaming in local mode. @Ted Yu There's no log Couldn't connect to leader for topic, here's the full version : spark-submit --conf config.resource=application-integration.conf --class nextgen.Main assembly-0.1-SNAPSHOT.jar 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nphung); users with modify permissions: Set(nphung) 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started 15/03/31 10:47:13 INFO Remoting: Starting remoting 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on port 44180. 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150331104713-2238 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' on port 50204. 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040 15/03/31 10:47:16 INFO SparkContext: Added JAR file:/home/nphung/assembly-0.1-SNAPSHOT.jar at http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 1427791636151 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager localhost:40630 with 265.1 MB RAM, BlockManagerId(driver, localhost, 40630) 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated
Re: Anyone has some simple example with spark-sql with spark 1.3
It works,thanks for your great help. On Mon, Mar 30, 2015 at 10:07 PM, Denny Lee denny.g@gmail.com wrote: Hi Vincent, This may be a case that you're missing a semi-colon after your CREATE TEMPORARY TABLE statement. I ran your original statement (missing the semi-colon) and got the same error as you did. As soon as I added it in, I was good to go again: CREATE TEMPORARY TABLE jsonTable USING org.apache.spark.sql.json OPTIONS ( path /samples/people.json ); -- above needed a semi-colon so the temporary table could be created first SELECT * FROM jsonTable; HTH! Denny On Sun, Mar 29, 2015 at 6:59 AM Vincent He vincent.he.andr...@gmail.com wrote: No luck, it does not work, anyone know whether there some special setting for spark-sql cli so we do not need to write code to use spark sql? Anyone have some simple example on this? appreciate any help. thanks in advance. On Sat, Mar 28, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote: See https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html I haven't tried the SQL statements in above blog myself. Cheers On Sat, Mar 28, 2015 at 5:39 AM, Vincent He vincent.he.andr...@gmail.com wrote: thanks for your information . I have read it, I can run sample with scala or python, but for spark-sql shell, I can not get an exmaple running successfully, can you give me an example I can run with ./bin/spark-sql without writing any code? thanks On Sat, Mar 28, 2015 at 7:35 AM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at https://spark.apache.org/docs/latest/sql-programming-guide.html Cheers On Mar 28, 2015, at 5:08 AM, Vincent He vincent.he.andr...@gmail.com wrote: I am learning spark sql and try spark-sql example, I running following code, but I got exception ERROR CliDriver: org.apache.spark.sql.AnalysisException: cannot recognize input near 'CREATE' 'TEMPORARY' 'TABLE' in ddl statement; line 1 pos 17, I have two questions, 1. Do we have a list of the statement supported in spark-sql ? 2. Does spark-sql shell support hiveql ? If yes, how to set? The example I tried: CREATE TEMPORARY TABLE jsonTable USING org.apache.spark.sql.json OPTIONS ( path examples/src/main/resources/people.json ) SELECT * FROM jsonTable The exception I got, CREATE TEMPORARY TABLE jsonTable USING org.apache.spark.sql.json OPTIONS ( path examples/src/main/resources/people.json ) SELECT * FROM jsonTable ; 15/03/28 17:38:34 INFO ParseDriver: Parsing command: CREATE TEMPORARY TABLE jsonTable USING org.apache.spark.sql.json OPTIONS ( path examples/src/main/resources/people.json ) SELECT * FROM jsonTable NoViableAltException(241@[654:1: ddlStatement : ( createDatabaseStatement | switchDatabaseStatement | dropDatabaseStatement | createTableStatement | dropTableStatement | truncateTableStatement | alterStatement | descStatement | showStatement | metastoreCheck | createViewStatement | dropViewStatement | createFunctionStatement | createMacroStatement | createIndexStatement | dropIndexStatement | dropFunctionStatement | dropMacroStatement | analyzeStatement | lockStatement | unlockStatement | lockDatabase | unlockDatabase | createRoleStatement | dropRoleStatement | grantPrivileges | revokePrivileges | showGrants | showRoleGrants | showRolePrincipals | showRoles | grantRole | revokeRole | setRole | showCurrentRole );]) at org.antlr.runtime.DFA.noViableAlt(DFA.java:158) at org.antlr.runtime.DFA.predict(DFA.java:144) at org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2090) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1398) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1036) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:199) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:241) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at
Re: can't union two rdds
Rdd union will result in 1 2 3 4 5 6 7 8 9 10 11 12 What you are trying to do is join. There must be a logic/key to perform join operation. I think in your case you want the order (index) to be the joining key here. RDD is a distributed data structure and is not apt for your case. If that amount for data is less, you can use rdd.collect, just iterate on it both the list and produce the desired result -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22323.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Actor not found
This is the whole output from the shell: ~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell Spark assembly has been built with Hive, including Datanucleus jars on classpath log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root 15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root 15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view pe rmissions: Set(root); users with modify permissions: Set(root) 15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server 15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:40 INFO AbstractConnector: Started SocketConnector@0.0.0.0:47797 15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class server' on port 47797. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.0 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information. 15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0 15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root 15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root 15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view pe rmissions: Set(root); users with modify permissions: Set(root) 15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started 15/03/30 19:00:42 INFO Remoting: Starting remoting 15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@vm:52574] 15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on port 52574. 15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker 15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster 15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at /tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5 15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea 15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:43 INFO AbstractConnector: Started SocketConnector@0.0.0.0:48500 15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file server' on port 48500. 15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/30 19:00:43 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040 15/03/30 19:00:43 INFO Executor: Starting executor ID driver on host localhost 15/03/30 19:00:43 INFO Executor: Using REPL class URI: http://10.11.204.80:47797 15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@vm:5 2574/user/HeartbeatReceiver 15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for: ActorSelection[Anchor(akka://sparkDriver/deadLetters), Path(/)] akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) at akka.actor.ActorCell.create(ActorCell.scala:596) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at
Re: deployment of spark on mesos and data locality in tachyon/hdfs
(resending...) I was thinking the same setup… But the more I think of this problem, and the more interesting this could be. If we allocate 50% total memory to Tachyon statically, then the Mesos benefits of dynamically scheduling resources go away altogether. Can Tachyon be resource managed by Mesos (dynamically)? Any thought or comment? Sean Hi Haoyuan, So on each mesos slave node I should allocate/section off some amount of memory for tachyon (let's say 50% of the total memory) and the rest for regular mesos tasks? This means, on each slave node I would have tachyon worker (+ hdfs configuration to talk to s3 or the hdfs datanode) and the mesos slave ?process. Is this correct? -- --Sean
rdd.cache() not working ?
Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile(hdfs://namenode_host:8020/user/person.txt).map(_.split(,)).map(p = Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com
How to setup a Spark Cluter?
Hi All, I am quite new to Spark. So please pardon me if it is a very basic question. I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master and 3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and ZooKeeper services installed. Now, I want to install Spark on it. How do I do that? I searched a lot online, but there is no clear step-by-step installation guide to do that. All I find is the standalone setup guides. Can someone provide steps? What needs to be copied to each machine? Where and what config changes should be made on each machine? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: different result from implicit ALS with explicit ALS
guoqiang ??s method works very well ?? it only takes 1TB disk now. thank you very much! ?? 2015??3??314:47??GuoQiang Li wi...@qq.com ?? You can try to enforce garbage collection: /** Run GC and make sure it actually has run */ def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. // Wait until a weak reference object has been GCed System.runFinalization() while (weakRef.get != null) { System.gc() System.runFinalization() Thread.sleep(200) if (System.currentTimeMillis - startTime 1) { throw new Exception(automatically cleanup error) } } } -- -- ??: lisendonglisend...@163.com mailto:lisend...@163.com; : 2015??3??31??(??) 3:47 ??: Xiangrui Mengmen...@gmail.com mailto:men...@gmail.com; : Xiangrui Mengm...@databricks.com mailto:m...@databricks.com; useruser@spark.apache.org mailto:user@spark.apache.org; Sean Owenso...@cloudera.com mailto:so...@cloudera.com; GuoQiang Liwi...@qq.com mailto:wi...@qq.com; : Re: different result from implicit ALS with explicit ALS I have update my spark source code to 1.3.1. the checkpoint works well. BUT the shuffle data still could not be delete automatically??the disk usage is still 30TB?? I have set the spark.cleaner.referenceTracking.blocking.shuffle to true. Do you know how to solve my problem? Sendong Li ?? 2015??3??3112:11??Xiangrui Meng men...@gmail.com mailto:men...@gmail.com ?? setCheckpointInterval was added in the current master and branch-1.3. Please help check whether it works. It will be included in the 1.3.1 and 1.4.0 release. -Xiangrui On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: hi, xiangrui: I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS: the code is : https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala PastedGraphic-2.tiff the checkpoint is very important in my situation, because my task will produce 1TB shuffle data in each iteration, it the shuffle data is not deleted in each iteration(using checkpoint()), the task will produce 30TB data?? So I change the ALS code, and re-compile by myself, but it seems the checkpoint does not take effects, and the task still occupy 30TB disk?? ( I only add two lines to the ALS.scala) : PastedGraphic-3.tiff and the driver??s log seems strange, why the log is printed together... PastedGraphic-1.tiff thank you very much! ?? 2015??2??2611:33??163 lisend...@163.com mailto:lisend...@163.com ?? Thank you very much for your opinion:) In our case, maybe it 's dangerous to treat un-observed item as negative interaction(although we could give them small confidence, I think they are still incredible...) I will do more experiments and give you feedback:) Thank you;) ?? 2015??2??2623:16??Sean Owen so...@cloudera.com mailto:so...@cloudera.com ?? I believe that's right, and is what I was getting at. yes the implicit formulation ends up implicitly including every possible interaction in its loss function, even unobserved ones. That could be the difference. This is mostly an academic question though. In practice, you have click-like data and should be using the implicit version for sure. However you can give negative implicit feedback to the model. You could consider no-click as a mild, observed, negative interaction. That is: supply a small negative value for these cases. Unobserved pairs are not part of the data set. I'd be careful about assuming the lack of an action carries signal. On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com mailto:lisend...@163.com wrote: oh my god, I think I understood... In my case, there are three kinds of user-item pairs: Display and click pair(positive pair) Display but no-click pair(negative pair) No-display pair(unobserved pair) Explicit ALS only consider the first and the second kinds But implicit ALS consider all the three kinds of pair(and consider the third kind as the second pair, because their preference value are all zero and confidence are all 1) So the result are different. right? Could you please give me some advice, which ALS should I use? If I use the implicit ALS, how to distinguish the second and the third kind of pair:) My opinion is in my case, I should use explicit ALS ... Thank you so much ?? 2015??2??2622:41??Xiangrui Meng m...@databricks.com mailto:m...@databricks.com
Re: refer to dictionary
Hi Ted, Thanks very much, yea, using broadcast is much faster. Best, Peng On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu yuzhih...@gmail.com wrote: You can use broadcast variable. See also this thread: http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+ On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi, I have a RDD (rdd1)where each line is split into an array [a, b, c], etc. And I also have a local dictionary p (dict1) stores key value pair {a:1, b: 2, c:3} I want to replace the keys in the rdd with the its corresponding value in the dict: rdd1.map(lambda line: [dict1[item] for item in line]) But this task is not distributed, I believe the reason is the dict1 is a local instance. Can any one provide suggestions on this to parallelize this? Thanks, Best, Peng
Ambiguous references to a field set in a partitioned table AND the data
Hi, I save Parquet files in a partitioned table, so in a path looking like /path/to/table/myfield=a/ . But I also kept the field myfield in the Parquet data. Thus. when I query the field, I get this error: df.select(myfield).show(10) Exception in thread main org.apache.spark.sql.AnalysisException: Ambiguous references to myfield (myfield#2,List()),(myfield#47,List()); Looking at the code, I could not find a way to explicitly specify which column I'd want. DataFrame#columns returns strings. Even by loading the data with a schema (StructType), I'm not sure I can do it. Should I have to make sure that my partition field does not exist in the data before saving ? Or is there a way to declare what column in the schema I want to query ? Also, for the same reasons, if I try to persist() the data, I get this error: * Caused by: java.lang.ArrayIndexOutOfBoundsException: 3 at parquet.bytes.BytesUtils.bytesToInt(BytesUtils.java:227) at parquet.column.statistics.IntStatistics.setMinMaxFromBytes(IntStatistics.java:46) at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249) at parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:558) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:492) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ambiguous-references-to-a-field-set-in-a-partitioned-table-AND-the-data-tp22325.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why Shuffle Write is not zero when everything is cached and there is enough memory?
I have noticed a similar issue when using spark streaming. The spark shuffle write size increases to a large size(in GB) and then the app crashes saying: java.io.FileNotFoundException: /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f (No such file or directory) I dont understand why the shuffle size increases to such a large value for long running jobs. Thanks, Udiy On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote: Thanks Saisai. I will try your solution, but still i don't understand why filesystem should be used where there is a plenty of memory available! On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle write will finally spill the data into file system as a bunch of files. If you want to avoid disk write, you can mount a ramdisk and configure spark.local.dir to this ram disk. So shuffle output will write to memory based FS, and will not introduce disk IO. Thanks Jerry 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com: Hi, I was looking at SparkUI, Executors, and I noticed that I have 597 MB for Shuffle while I am using cached temp-table and the Spark had 2 GB free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!! Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be done in memory? best, /Shahab
Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
Yep, it's not serializable: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html You can't return this from a distributed operation since that would mean it has to travel over the network and you haven't supplied any way to convert the thing into bytes. On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com wrote: When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, //Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcasting a parquet file using spark and python
In Spark 1.3 I would expect this to happen automatically when the parquet table is small ( 10mb, configurable with spark.sql.autoBroadcastJoinThreshold). If you are running 1.3 and not seeing this, can you show the code you are using to create the table? On Tue, Mar 31, 2015 at 3:25 AM, jitesh129 jitesh...@gmail.com wrote: How can we implement a BroadcastHashJoin for spark with python? My SparkSQL inner joins are taking a lot of time since it is performing ShuffledHashJoin. Tables on which join is performed are stored as parquet files. Please help. Thanks and regards, Jitesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcasting-a-parquet-file-using-spark-and-python-tp22315.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array
I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to expose it via SparkSQL. I am using spark 1.2.1, latest supported by elasticsearch-hadoop, and org.elasticsearch % elasticsearch-hadoop % 2.1.0.BUILD-SNAPSHOT of elasticsearch-hadoop. I’m encountering an issue when I attempt to query the following json after creating a temporary table from it. The json looks like this: PUT /_template/device { template: dev*, settings: { number_of_shards: 1 }, mappings: { metric: { _timestamp : { enabled : true, stored : true, path : timestamp, format : -MM-dd'T'HH:mm:ssZZ }, properties: { pathId: { type: string }, pathElements: { properties: { node: { type: string }, value: { type: string } } }, name: { type: string }, value: { type: double }, timestamp: { type: date, store: true } } } } } Querying all columns work fine except for the pathElements which is a json array. If this is added to the select it fails with ajava.util.NoSuchElementException: key not found: node. *Details*. The program is pretty basic, looks like this: /** * A simple sample to read and write to ES using elasticsearch-hadoop. */ package com.opsdatastore.elasticsearch.spark import java.io.File // Scala imports import scala.collection.JavaConversions._ // Spark imports import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ // OpsDataStore import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext.addJar(Spark.JarPath + jar)) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() // specific query, just read all for now sc.esRDD(s${ElasticSearch.Index}/${ElasticSearch.Type}, ?q=*:*) /* * Read from ES and provide some insight with Spark SparkSQL */ val esData = sc.esRDD(device/metric) esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(sTotal time: ${end-start} ms) println(Create Metric Temporary Table for querying) val schemaRDD = sqlContext.sql( CREATE TEMPORARY TABLE metric + USING org.elasticsearch.spark.sql + OPTIONS (resource 'device/metric') ) System.out.println() System.out.println(# Scheam Definition #) System.out.println() schemaRDD.printSchema() System.out.println() System.out.println(# Data from SparkSQL #) System.out.println() sqlContext.sql(SELECT path, pathElements, `timestamp`, name, value FROM metric).collect.foreach(println(_)) } } So this works fine: sc.esRDD(**device/metric) esData.collect.foreach(println(_)) And results in this: 15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 4.948556 s (AUxxDrs4cgadF5SlaMg0,Map(pathElements - Buffer(Map(node - State, value - PA), Map(node - City, value - Pittsburgh), Map(node - Street, value - 12345 Westbrook Drive), Map(node - level, value - main), Map(node - device, value - thermostat)), value - 29.590943279257175, name - Current Temperature, timestamp - 2015-03-27T14:53:46+, path - /PA/Pittsburgh/12345 Westbrook Drive/main/theromostat-1)) Yet this fails: sqlContext.sql(SELECT path, pathElements, `timestamp`, name, value FROM metric).collect.foreach(println(_)) With this exception: Create Metric Temporary Table for querying# Scheam Definition # root # Data from SparkSQL #15/03/31 14:37:49 INFO BlockManager: Removing broadcast 015/03/31 14:37:49 INFO BlockManager:
java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, //Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // }
Re: How to configure SparkUI to use internal ec2 ip
Hi Akhil, I tried editing the /etc/hosts on the master and on the workers, and seems it is not working for me. I tried adding hostname internal-ip and it didn't work. I then tried adding internal-ip hostname and it didn't work either. I guess I should also edit the spark-env.sh file? Thanks! Anny On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can add an internal ip to public hostname mapping in your /etc/hosts file, if your forwarding is proper then it wouldn't be a problem there after. Thanks Best Regards On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote: Hi, For security reasons, we added a server between my aws Spark Cluster and local, so I couldn't connect to the cluster directly. To see the SparkUI and its related work's stdout and stderr, I used dynamic forwarding and configured the SOCKS proxy. Now I could see the SparkUI using the internal ec2 ip, however when I click on the application UI (4040) or the worker's UI (8081), it still automatically uses the public DNS instead of internal ec2 ip, which the browser now couldn't show. Is there a way that I could configure this? I saw that one could configure the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could help. Does anyone experience the same issue? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to setup a Spark Cluter?
Its pretty simple, pick one machine as master (say machine A), and lets call the workers are B,C, and D *Login to A:* - Enable passwd less authentication (ssh-keygen) - Add A's ~/.ssh/id_rsa.pub to B,C,D's ~/.ssh/authorized_keys file - Download spark binary (that supports your hadoop version) from https://spark.apache.org/downloads.html (eg: wget http://d3kbcqa49mib13.cloudfront.net/spark-1.3.0-bin-hadoop2.4.tgz) - Extract it (tar xf spark*tgz) - cd spark-1.3.0-bin-hadoop2.4;cp conf/spark-env.sh.template conf/spark-env.sh - vi conf/spark-env.sh : Now configure SPARK_MASTER_IP, SPARK_WORKER_CORES, SPARK_WORKER_MEMORY as the resources you have. - vi conf/slaves : Add B,C,D hostnames/ipaddress line by line - cd ../; - rsync -za spark-1.3.0-bin-hadoop2.4 B: - rsync -za spark-1.3.0-bin-hadoop2.4 C: - rsync -za spark-1.3.0-bin-hadoop2.4 D: - cd spark-1.3.0-bin-hadoop2.4;sbin/start-all.sh Now your cluster is up and running, just be careful with your firewall entries. If you open up all ports then anyone can take over your cluster. :) Read more : https://www.sigmoid.com/securing-apache-spark-cluster/ Thanks Best Regards On Tue, Mar 31, 2015 at 10:26 PM, bhushansc007 bhushansc...@gmail.com wrote: Hi All, I am quite new to Spark. So please pardon me if it is a very basic question. I have setup a Hadoop cluster using Hortonwork's Ambari. It has 1 Master and 3 Worker nodes. Currently, it has HDFS, Yarn, MapReduce2, HBase and ZooKeeper services installed. Now, I want to install Spark on it. How do I do that? I searched a lot online, but there is no clear step-by-step installation guide to do that. All I find is the standalone setup guides. Can someone provide steps? What needs to be copied to each machine? Where and what config changes should be made on each machine? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-setup-a-Spark-Cluter-tp22326.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to configure SparkUI to use internal ec2 ip
Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh? On 31.3.2015. 19:19, Anny Chen wrote: Hi Akhil, I tried editing the /etc/hosts on the master and on the workers, and seems it is not working for me. I tried adding hostname internal-ip and it didn't work. I then tried adding internal-ip hostname and it didn't work either. I guess I should also edit the spark-env.sh file? Thanks! Anny On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: You can add an internal ip to public hostname mapping in your /etc/hosts file, if your forwarding is proper then it wouldn't be a problem there after. Thanks Best Regards On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com mailto:anny9...@gmail.com wrote: Hi, For security reasons, we added a server between my aws Spark Cluster and local, so I couldn't connect to the cluster directly. To see the SparkUI and its related work's stdout and stderr, I used dynamic forwarding and configured the SOCKS proxy. Now I could see the SparkUI using the internal ec2 ip, however when I click on the application UI (4040) or the worker's UI (8081), it still automatically uses the public DNS instead of internal ec2 ip, which the browser now couldn't show. Is there a way that I could configure this? I saw that one could configure the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could help. Does anyone experience the same issue? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: How to configure SparkUI to use internal ec2 ip
When you say you added internal-ip hostname, where you able to ping any of these from the machine? You could try setting SPARK_LOCAL_IP on all machines. But make sure you will be able to bind to that host/ip specified there. Thanks Best Regards On Tue, Mar 31, 2015 at 10:49 PM, Anny Chen anny9...@gmail.com wrote: Hi Akhil, I tried editing the /etc/hosts on the master and on the workers, and seems it is not working for me. I tried adding hostname internal-ip and it didn't work. I then tried adding internal-ip hostname and it didn't work either. I guess I should also edit the spark-env.sh file? Thanks! Anny On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can add an internal ip to public hostname mapping in your /etc/hosts file, if your forwarding is proper then it wouldn't be a problem there after. Thanks Best Regards On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote: Hi, For security reasons, we added a server between my aws Spark Cluster and local, so I couldn't connect to the cluster directly. To see the SparkUI and its related work's stdout and stderr, I used dynamic forwarding and configured the SOCKS proxy. Now I could see the SparkUI using the internal ec2 ip, however when I click on the application UI (4040) or the worker's UI (8081), it still automatically uses the public DNS instead of internal ec2 ip, which the browser now couldn't show. Is there a way that I could configure this? I saw that one could configure the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could help. Does anyone experience the same issue? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
The example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala might help Best, -- Nan Zhu http://codingcat.me On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote: Yep, it's not serializable: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html You can't return this from a distributed operation since that would mean it has to travel over the network and you haven't supplied any way to convert the thing into bytes. On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com (mailto:gangele...@gmail.com) wrote: When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, // Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: Spark-events does not exist error, while it does with all the req. rights
Hmmm... could you try to set the log dir to file:/home/hduser/spark/spark-events? I checked the code and it might be the case that the behaviour changed between 1.2 and 1.3... On Mon, Mar 30, 2015 at 6:44 PM, Tom Hubregtsen thubregt...@gmail.com wrote: The stack trace for the first scenario and your suggested improvement is similar, with as only difference the first line (Sorry for not including this): Log directory /home/hduser/spark/spark-events does not exist. To verify your premises, I cd'ed into the directory by copy pasting the path listed in the error message (i, ii), created a text file, closed it an viewed it, and deleted it (iii). My findings were reconfirmed by my colleague. Any other ideas? Thanks, Tom On 30 March 2015 at 19:19, Marcelo Vanzin van...@cloudera.com wrote: So, the error below is still showing the invalid configuration. You mentioned in the other e-mails that you also changed the configuration, and that the directory really, really exists. Given the exception below, the only ways you'd get the error with a valid configuration would be if (i) the directory didn't exist, (ii) it existed but the user could not navigate to it or (iii) it existed but was not actually a directory. So please double-check all that. On Mon, Mar 30, 2015 at 5:11 PM, Tom Hubregtsen thubregt...@gmail.com wrote: Stack trace: 15/03/30 17:37:30 INFO storage.BlockManagerMaster: Registered BlockManager Exception in thread main java.lang.IllegalArgumentException: Log directory ~/spark/spark-events does not exist. -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why Shuffle Write is not zero when everything is cached and there is enough memory?
Thanks for the reply. This will reduce the shuffle write to disk to an extent but for a long running job(multiple days), the shuffle write would still occupy a lot of space on disk. Why do we need to store the data from older map tasks to memory? On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak bijay.pat...@cloudwick.com wrote: The Spark Sort-Based Shuffle (default from 1.1) keeps the data from each Map tasks to memory until they they can't fit after which they are sorted and spilled to disk. You can reduce the Shuffle write to disk by increasing spark.shuffle.memoryFraction(default 0.2). By writing the shuffle output to disk the Spark lineage can be truncated when the RDDs are already materialized as the side-effects of earlier shuffle.This is the under the hood optimization in Spark which is only possible because of shuffle output output being written to disk. You can set spark.shuffle.spill to false if you don't want to spill to the disk and assuming you have enough heap memory. On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote: I have noticed a similar issue when using spark streaming. The spark shuffle write size increases to a large size(in GB) and then the app crashes saying: java.io.FileNotFoundException: /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f (No such file or directory) I dont understand why the shuffle size increases to such a large value for long running jobs. Thanks, Udiy On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote: Thanks Saisai. I will try your solution, but still i don't understand why filesystem should be used where there is a plenty of memory available! On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle write will finally spill the data into file system as a bunch of files. If you want to avoid disk write, you can mount a ramdisk and configure spark.local.dir to this ram disk. So shuffle output will write to memory based FS, and will not introduce disk IO. Thanks Jerry 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com: Hi, I was looking at SparkUI, Executors, and I noticed that I have 597 MB for Shuffle while I am using cached temp-table and the Spark had 2 GB free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!! Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be done in memory? best, /Shahab
Re: why Shuffle Write is not zero when everything is cached and there is enough memory?
The Spark Sort-Based Shuffle (default from 1.1) keeps the data from each Map tasks to memory until they they can't fit after which they are sorted and spilled to disk. You can reduce the Shuffle write to disk by increasing spark.shuffle.memoryFraction(default 0.2). By writing the shuffle output to disk the Spark lineage can be truncated when the RDDs are already materialized as the side-effects of earlier shuffle.This is the under the hood optimization in Spark which is only possible because of shuffle output output being written to disk. You can set spark.shuffle.spill to false if you don't want to spill to the disk and assuming you have enough heap memory. On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote: I have noticed a similar issue when using spark streaming. The spark shuffle write size increases to a large size(in GB) and then the app crashes saying: java.io.FileNotFoundException: /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f (No such file or directory) I dont understand why the shuffle size increases to such a large value for long running jobs. Thanks, Udiy On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote: Thanks Saisai. I will try your solution, but still i don't understand why filesystem should be used where there is a plenty of memory available! On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle write will finally spill the data into file system as a bunch of files. If you want to avoid disk write, you can mount a ramdisk and configure spark.local.dir to this ram disk. So shuffle output will write to memory based FS, and will not introduce disk IO. Thanks Jerry 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com: Hi, I was looking at SparkUI, Executors, and I noticed that I have 597 MB for Shuffle while I am using cached temp-table and the Spark had 2 GB free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!! Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be done in memory? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using 'fair' scheduler mode
Hi, I am using the Spark ‘fair’ scheduler mode. I have noticed that if the first query is a very expensive query (ex: ‘select *’ on a really big data set) than any subsequent query seem to get blocked. I would have expected the second query to run in parallel since I am using the ‘fair’ scheduler mode not the ‘fifo’. I am submitting the query through thrift server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-fair-scheduler-mode-tp22328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Did anybody run Spark-perf on powerpc?
We verified it runs on x86, and are now trying to run it on powerPC. We currently run into dependency trouble with sbt. I tried installing sbt by hand and resolving all dependencies by hand, but must have made an error, as I still get errors. Original error: Getting org.scala-sbt sbt 0.13.6 ... :: problems summary :: WARNINGS module not found: org.scala-sbt#sbt;0.13.6 local: tried /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar: /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/jars/sbt.jar typesafe-ivy-releases: tried https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml Maven Central: tried https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar: https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar :: :: UNRESOLVED DEPENDENCIES :: :: :: org.scala-sbt#sbt;0.13.6: not found :: ERRORS Server access Error: Received fatal alert: decrypt_error url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed: java.security.cert.CertPathValidatorException: The certificate issued by CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not trusted; internal cause is: java.security.cert.CertPathValidatorException: Certificate chaining error url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed: java.security.cert.CertPathValidatorException: The certificate issued by CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not trusted; internal cause is: java.security.cert.CertPathValidatorException: Certificate chaining error url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Query REST web service with Spark?
Hi, If I recall correctly, I've read people integrating REST calls to Spark Streaming jobs in the user list. I don't imagine any cases for why it shouldn't be possible. Best, Burak On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir minnown...@gmail.com wrote: We have have some data on Hadoop that needs augmented with data only available to us via a REST service. We're using Spark to search for, and correct, missing data. Even though there are a lot of records to scour for missing data, the total number of calls to the service is expected to be low, so it would be ideal to do the whole job in Spark as we scour the data. I don't see anything obvious in the API or on Google relating to making REST calls from a Spark job. Is it possible? Thanks, Alec
Ambiguous references to a field set in a partitioned table AND the data
Hi, I save Parquet files in a partitioned table, so in /path/to/table/myfield=a/ . But I also kept the field myfield in the Parquet data. Thus. when I query the field, I get this error: df.select(myfield).show(10) Exception in thread main org.apache.spark.sql.AnalysisException: Ambiguous references to myfield (myfield#2,List()),(myfield#47,List()); Looking at the code, I could not find a way to explicitly specify which column I'd want. DataFrame#columns returns strings. Even by loading the data with a schema (StructType), I'm not sure I can do it. Should I have to make sure that my partition field does not exist in the data before saving ? Or is there a way to declare what column in the schema I want to query ? Thanks.
pyspark error with zip
? The following program fails in the zip step. x = sc.parallelize([1, 2, 3, 1, 2, 3]) y = sc.parallelize([1, 2, 3]) z = x.distinct() print x.zip(y).collect() The error that is produced depends on whether multiple partitions have been specified or not. I understand that the two RDDs [must] have the same number of partitions and the same number of elements in each partition. What is the best way to work around this restriction? I have been performing the operation with the following code, but I am hoping to find something more efficient. def safe_zip(left, right): ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0])) ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0])) return ix_left.join(ix_right).sortByKey().values()
Re: different result from implicit ALS with explicit ALS
Hey Guoqiang and Sendong, Could you comment on the overhead of calling gc() explicitly? The shuffle files should get cleaned in a few seconds after checkpointing, but it is certainly possible to accumulates TBs of files in a few seconds. In this case, calling gc() may work the same as waiting for a few seconds after each checkpoint. Is it correct? Best, Xiangrui On Tue, Mar 31, 2015 at 8:58 AM, lisendong lisend...@163.com wrote: guoqiang ’s method works very well … it only takes 1TB disk now. thank you very much! 在 2015年3月31日,下午4:47,GuoQiang Li wi...@qq.com 写道: You can try to enforce garbage collection: /** Run GC and make sure it actually has run */ def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. // Wait until a weak reference object has been GCed System.runFinalization() while (weakRef.get != null) { System.gc() System.runFinalization() Thread.sleep(200) if (System.currentTimeMillis - startTime 1) { throw new Exception(automatically cleanup error) } } } -- 原始邮件 -- *发件人:* lisendonglisend...@163.com; *发送时间:* 2015年3月31日(星期二) 下午3:47 *收件人:* Xiangrui Mengmen...@gmail.com; *抄送:* Xiangrui Mengm...@databricks.com; useruser@spark.apache.org; Sean Owenso...@cloudera.com; GuoQiang Liwi...@qq.com; *主题:* Re: different result from implicit ALS with explicit ALS I have update my spark source code to 1.3.1. the checkpoint works well. BUT the shuffle data still could not be delete automatically…the disk usage is still 30TB… I have set the spark.cleaner.referenceTracking.blocking.shuffle to true. Do you know how to solve my problem? Sendong Li 在 2015年3月31日,上午12:11,Xiangrui Meng men...@gmail.com 写道: setCheckpointInterval was added in the current master and branch-1.3. Please help check whether it works. It will be included in the 1.3.1 and 1.4.0 release. -Xiangrui On Mon, Mar 30, 2015 at 7:27 AM, lisendong lisend...@163.com wrote: hi, xiangrui: I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS: the code is : https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala PastedGraphic-2.tiff the checkpoint is very important in my situation, because my task will produce 1TB shuffle data in each iteration, it the shuffle data is not deleted in each iteration(using checkpoint()), the task will produce 30TB data… So I change the ALS code, and re-compile by myself, but it seems the checkpoint does not take effects, and the task still occupy 30TB disk… ( I only add two lines to the ALS.scala) : PastedGraphic-3.tiff and the driver’s log seems strange, why the log is printed together... PastedGraphic-1.tiff thank you very much! 在 2015年2月26日,下午11:33,163 lisend...@163.com 写道: Thank you very much for your opinion:) In our case, maybe it 's dangerous to treat un-observed item as negative interaction(although we could give them small confidence, I think they are still incredible...) I will do more experiments and give you feedback:) Thank you;) 在 2015年2月26日,23:16,Sean Owen so...@cloudera.com 写道: I believe that's right, and is what I was getting at. yes the implicit formulation ends up implicitly including every possible interaction in its loss function, even unobserved ones. That could be the difference. This is mostly an academic question though. In practice, you have click-like data and should be using the implicit version for sure. However you can give negative implicit feedback to the model. You could consider no-click as a mild, observed, negative interaction. That is: supply a small negative value for these cases. Unobserved pairs are not part of the data set. I'd be careful about assuming the lack of an action carries signal. On Thu, Feb 26, 2015 at 3:07 PM, 163 lisend...@163.com wrote: oh my god, I think I understood... In my case, there are three kinds of user-item pairs: Display and click pair(positive pair) Display but no-click pair(negative pair) No-display pair(unobserved pair) Explicit ALS only consider the first and the second kinds But implicit ALS consider all the three kinds of pair(and consider the third kind as the second pair, because their preference value are all zero and confidence are all 1) So the result are different. right? Could you please give me some advice, which ALS should I use? If I use the implicit ALS, how to distinguish the second and the third kind of pair:) My opinion is in my case, I should use explicit ALS ... Thank you so much 在 2015年2月26日,22:41,Xiangrui Meng m...@databricks.com 写道: Lisen, did you use all m-by-n pairs during training? Implicit model penalizes unobserved ratings, while explicit model doesn't. -Xiangrui On Feb 26,
joining multiple parquet files
Hi , I have 4 parquet files and I want to find data which is common in all of them e.g SELECT TableA.*, TableB.*, TableC.*, TableD.* FROM (TableB INNER JOIN TableA ON TableB.aID= TableA.aID) INNER JOIN TableC ON(TableB.cID= Tablec.cID) INNER JOIN TableA ta ON(ta.dID= TableD.dID) WHERE (DATE(TableC.date)=date(now())) I can do a 2 files join like - val joinedVal = g1.join(g2,g1.col(kmer) === g2.col(kmer)) But I am trying to find common kmer strings from 4 files. Thanks Roni
Re: Unable to save dataframe with UDT created with sqlContext.createDataFrame
I cannot reproduce this error on master, but I'm not aware of any recent bug fixes that are related. Could you build and try the current master? -Xiangrui On Tue, Mar 31, 2015 at 4:10 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, DataFrame with an user defined type (here mllib.Vector) created with sqlContex.createDataFrame can't be saved to parquet file and raise ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.sql.Row error. Here is an example of code to reproduce this error : object TestDataFrame { def main(args: Array[String]): Unit = { //System.loadLibrary(Core.NATIVE_LIBRARY_NAME) val conf = new SparkConf().setAppName(RankingEval).setMaster(local[8]) .set(spark.executor.memory, 6g) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(LabeledPoint(1, Vectors.zeros(10 val dataDF = data.toDF dataDF.save(test1.parquet) val dataDF2 = sqlContext.createDataFrame(dataDF.rdd, dataDF.schema) dataDF2.save(test2.parquet) } } Is this related to https://issues.apache.org/jira/browse/SPARK-5532 and how can it be solved ? Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: can't union two rdds
use zip -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-t-union-two-rdds-tp22320p22321.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql query fails with executor lost/ out of memory expection while caching a table
Hi, I am using spark 1.2.1 I am using thrift server to query my data. while executing query CACHE TABLE tablename Fails with exception Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 41, bbr-dev178): Execu torLostFailure (executor 12 lost) and sometimes Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 33, bbr-dev178): java.la ng.OutOfMemoryError: Java heap space I understand that my executors are going out of memory during the caching and therefore getting killed. My question is.. Is there a way to make the thirft server spill the data to disk if it is not able keep the entire dataset in memory? Can i change the Storage Level for spark sql thrift server for caching? I don't want my executors to get lost and cache queries to get failed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-query-fails-with-executor-lost-out-of-memory-expection-while-caching-a-table-tp22322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
deployment of spark on mesos and data locality in tachyon/hdfs
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I am fairly new to the spark ecosystem and I have been trying to setup a spark on mesos deployment. I can't seem to figure out the best practices around HDFS and Tachyon. The documentation about Spark's data-locality section seems to point that each of my mesos slave nodes should also run a hdfs datanode. This seems fine but I can't seem to figure out how I would go about pushing tachyon into the mix. How should i organize my cluster? Should tachyon be colocated on my mesos worker nodes? or should all the spark jobs reach out to a separate hdfs/tachyon cluster. - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb /wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6 iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4= =8ijP -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: deployment of spark on mesos and data locality in tachyon/hdfs
Tachyon should be co-located with Spark in this case. Best, Haoyuan On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan achau...@brightcove.com wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, I am fairly new to the spark ecosystem and I have been trying to setup a spark on mesos deployment. I can't seem to figure out the best practices around HDFS and Tachyon. The documentation about Spark's data-locality section seems to point that each of my mesos slave nodes should also run a hdfs datanode. This seems fine but I can't seem to figure out how I would go about pushing tachyon into the mix. How should i organize my cluster? Should tachyon be colocated on my mesos worker nodes? or should all the spark jobs reach out to a separate hdfs/tachyon cluster. - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVGy4bAAoJEOSJAMhvLp3L5bkH/0MECyZkh3ptWzmsNnSNfGWp Oh93TUfD+foXO2ya9D+hxuyAxbjfXs/68aCWZsUT6qdlBQU9T1vX+CmPOnpY1KPN NJP3af+VK0osaFPo6k28OTql1iTnvb9Nq+WDlohxBC/hZtoYl4cVxu8JmRlou/nb /wfpp0ShmJnlxsoPa6mVdwzjUjVQAfEpuet3Ow5veXeA9X7S55k/h0ZQrZtO8eXL jJsKaT8ne9WZPhZwA4PkdzTxkXF3JNveCIKPzNttsJIaLlvd0nLA/wu6QWmxskp6 iliGSmEk5P1zZWPPnk+TPIqbA0Ttue7PeXpSrbA9+pYiNT4R/wAneMvmpTABuR4= =8ijP -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: deployment of spark on mesos and data locality in tachyon/hdfs
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Haoyuan, So on each mesos slave node I should allocate/section off some amount of memory for tachyon (let's say 50% of the total memory) and the rest for regular mesos tasks? This means, on each slave node I would have tachyon worker (+ hdfs configuration to talk to s3 or the hdfs datanode) and the mesos slave process. Is this correct? On 31/03/2015 16:43, Haoyuan Li wrote: Tachyon should be co-located with Spark in this case. Best, Haoyuan On Tue, Mar 31, 2015 at 4:30 PM, Ankur Chauhan achau...@brightcove.com mailto:achau...@brightcove.com wrote: Hi, I am fairly new to the spark ecosystem and I have been trying to setup a spark on mesos deployment. I can't seem to figure out the best practices around HDFS and Tachyon. The documentation about Spark's data-locality section seems to point that each of my mesos slave nodes should also run a hdfs datanode. This seems fine but I can't seem to figure out how I would go about pushing tachyon into the mix. How should i organize my cluster? Should tachyon be colocated on my mesos worker nodes? or should all the spark jobs reach out to a separate hdfs/tachyon cluster. -- Ankur Chauhan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ - -- - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVGzKUAAoJEOSJAMhvLp3L3W4IAIVYiEKIZbC1a36/KWo94xYB dvE4VXxF7z5FWmpuaHBEa+U1XWrR4cLVsQhocusOFn+oC7bstdltt3cGNAuwFSv6 Oogs4Sl1J4YZm8omKVdCkwD6Hv71HSntM8llz3qTW+Ljk2aKhfvNtp5nioQAm3e+ bs4ZKlCBij/xV3LbYYIePSS3lL0d9m1qEDJvi6jFcfm3gnBYeNeL9x92B5ylyth0 BGHnPN4sV/yopgrqOimLb12gSexHGNP1y6JBYy8NrHRY8SxkZ4sWKuyDnGDCOPOc HC14Parf5Ly5FEz5g5WjF6HrXRdPlgr2ABxSLWOAB/siXsX9o/4yCy7NtDNcL6Y= =f2xI -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Query REST web service with Spark?
Here are a few ways to achieve what your loolking to do: https://github.com/cjnolet/spark-jetty-server Spark Job Server - https://github.com/spark-jobserver/spark-jobserver - defines a REST API for Spark Hue - http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ Spark Kernel project: https://github.com/ibm-et/spark-kernel The Spark Kernel's goal is to serve as the foundation for interactive applications. The project provides a client library in Scala that abstracts connecting to the kernel (containing a Spark Context), which can be embedded into a web application. We demonstrated this at StataConf when we embedded the Spark Kernel client into a Play application to provide an interactive web application that communicates to Spark via the Spark Kernel (hosting a SparkContext). Hopefully one of those will give you what your looking for. -Todd On Tue, Mar 31, 2015 at 5:06 PM, Burak Yavuz brk...@gmail.com wrote: Hi, If I recall correctly, I've read people integrating REST calls to Spark Streaming jobs in the user list. I don't imagine any cases for why it shouldn't be possible. Best, Burak On Tue, Mar 31, 2015 at 1:46 PM, Minnow Noir minnown...@gmail.com wrote: We have have some data on Hadoop that needs augmented with data only available to us via a REST service. We're using Spark to search for, and correct, missing data. Even though there are a lot of records to scour for missing data, the total number of calls to the service is expected to be low, so it would be ideal to do the whole job in Spark as we scour the data. I don't see anything obvious in the API or on Google relating to making REST calls from a Spark job. Is it possible? Thanks, Alec
Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0
Hey Sean, That is true for explicit model, but not for implicit. The ALS-WR paper doesn't cover the implicit model. In implicit formulation, a sub-problem (for v_j) is: min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2 This is a sum for all i but not just the users who rate item j. In this case, if we set X=m_j, the number of observed ratings for item j, it is not really scale invariant. We have #users user vectors in the least squares problem but only penalize lambda * #ratings. I was suggesting using lambda * m directly for implicit model to match the number of vectors in the least squares problem. Well, this is my theory. I don't find any public work about it. Best, Xiangrui On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen so...@cloudera.com wrote: I had always understood the formulation to be the first option you describe. Lambda is scaled by the number of items the user has rated / interacted with. I think the goal is to avoid fitting the tastes of prolific users disproportionately just because they have many ratings to fit. This is what's described in the ALS-WR paper we link to on the Spark web site, in equation 5 (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf) I think this also gets you the scale-invariance? For every additional rating from user i to product j, you add one new term to the squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the regularization term by lambda * (|u_i|^2 + |m_j|^2) They are at least both increasing about linearly as ratings increase. If the regularization term is multiplied by the total number of users and products in the model, then it's fixed. I might misunderstand you and/or be speaking about something slightly different when it comes to invariance. But FWIW I had always understood the regularization to be multiplied by the number of explicit ratings. On Mon, Mar 30, 2015 at 5:51 PM, Xiangrui Meng men...@gmail.com wrote: Okay, I didn't realize that I changed the behavior of lambda in 1.3. to make it scale-invariant, but it is worth discussing whether this is a good change. In 1.2, we multiply lambda by the number ratings in each sub-problem. This makes it scale-invariant for explicit feedback. However, in implicit feedback model, a user's sub-problem contains all item factors. Then the question is whether we should multiply lambda by the number of explicit ratings from this user or by the total number of items. We used the former in 1.2 but changed to the latter in 1.3. So you should try a smaller lambda to get a similar result in 1.3. Sean and Shuo, which approach do you prefer? Do you know any existing work discussing this? Best, Xiangrui On Fri, Mar 27, 2015 at 11:27 AM, Xiangrui Meng men...@gmail.com wrote: This sounds like a bug ... Did you try a different lambda? It would be great if you can share your dataset or re-produce this issue on the public dataset. Thanks! -Xiangrui On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote: After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly smaller factors (and hence scores). For example, the first few product's factor values in 1.2.0 are (0.04821, -0.00674, -0.0325). In 1.3.0, the first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This difference of several orders of magnitude is consistent throughout both user and product. The recommendations from 1.2.0 are subjectively much better than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less memory. My first thought is that there is too much regularization in the 1.3.0 results, but I'm using the same lambda parameter value. This is a snippet of my scala code: . val rank = 75 val numIterations = 15 val alpha = 10 val lambda = 0.01 val model = ALS.trainImplicit(train_data, rank, numIterations, lambda=lambda, alpha=alpha) . The code and input data are identical across both versions. Did anything change between the two versions I'm not aware of? I'd appreciate any help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why Shuffle Write is not zero when everything is cached and there is enough memory?
Hi Udit, The persisted RDDs in memory is cleared by Spark using LRU policy and you can also set the time to clear the persisted RDDs and meta-data by setting* spark.cleaner.ttl *(default infinite). But I am not aware about any properties to clean the older shuffle write from from disks. thanks, bijay On Tue, Mar 31, 2015 at 1:50 PM, Udit Mehta ume...@groupon.com wrote: Thanks for the reply. This will reduce the shuffle write to disk to an extent but for a long running job(multiple days), the shuffle write would still occupy a lot of space on disk. Why do we need to store the data from older map tasks to memory? On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak bijay.pat...@cloudwick.com wrote: The Spark Sort-Based Shuffle (default from 1.1) keeps the data from each Map tasks to memory until they they can't fit after which they are sorted and spilled to disk. You can reduce the Shuffle write to disk by increasing spark.shuffle.memoryFraction(default 0.2). By writing the shuffle output to disk the Spark lineage can be truncated when the RDDs are already materialized as the side-effects of earlier shuffle.This is the under the hood optimization in Spark which is only possible because of shuffle output output being written to disk. You can set spark.shuffle.spill to false if you don't want to spill to the disk and assuming you have enough heap memory. On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote: I have noticed a similar issue when using spark streaming. The spark shuffle write size increases to a large size(in GB) and then the app crashes saying: java.io.FileNotFoundException: /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f (No such file or directory) I dont understand why the shuffle size increases to such a large value for long running jobs. Thanks, Udiy On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote: Thanks Saisai. I will try your solution, but still i don't understand why filesystem should be used where there is a plenty of memory available! On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com wrote: Shuffle write will finally spill the data into file system as a bunch of files. If you want to avoid disk write, you can mount a ramdisk and configure spark.local.dir to this ram disk. So shuffle output will write to memory based FS, and will not introduce disk IO. Thanks Jerry 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com: Hi, I was looking at SparkUI, Executors, and I noticed that I have 597 MB for Shuffle while I am using cached temp-table and the Spark had 2 GB free memory (the number under Memory Used is 597 MB /2.6 GB) ?!!! Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be done in memory? best, /Shahab
Re: SparkSql - java.util.NoSuchElementException: key not found: node when access JSON Array
So in looking at this a bit more, I gather the root cause is the fact that the nested fields are represented as rows within rows, is that correct? If I don't know the size of the json array (it varies), using x.getAs[Row](0).getString(0) is not really a valid solution. Is the solution to apply a lateral view + explode to this? I have attempted to change to a lateral view, but looks like my syntax is off: sqlContext.sql( SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe) .collect.foreach(println(_)) Which results in: 15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0 Exception in thread main java.lang.RuntimeException: [1.68] failure: ``UNION'' expected but identifier view found SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Is this the right approach? Is this syntax available in 1.2.1: SELECT v1.name, v2.city, v2.state FROM people LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 as name, address LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2 as city, state; -Todd On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist tsind...@gmail.com wrote: I am accessing ElasticSearch via the elasticsearch-hadoop and attempting to expose it via SparkSQL. I am using spark 1.2.1, latest supported by elasticsearch-hadoop, and org.elasticsearch % elasticsearch-hadoop % 2.1.0.BUILD-SNAPSHOT of elasticsearch-hadoop. I’m encountering an issue when I attempt to query the following json after creating a temporary table from it. The json looks like this: PUT /_template/device { template: dev*,
--driver-memory parameter doesn't work for spark-submmit on yarn?
Hi All, Below is the my shell script: /home/hadoop/spark/bin/spark-submit --driver-memory=5G --executor-memory=40G --master yarn-client --class com.***.FinancialEngineExecutor /home/hadoop/lib/my.jar s3://bucket/vriscBatchConf.properties My driver will load some resources and then broadcast to all executors. That resources is only 600MB in ser format, but I always has out of memory exception, it looks like the driver doesn't allocate right memory to my driver. Exception in thread main java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:70) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at com.***.executor.support.S3FileUtils.loadCache(S3FileUtils.java:68) Do I do anything wrong here? And no matter how much I set for --driver-memory value (from 512M to 20G), it always give me error on the same line (that line try to load a 600MB java serialization file). So looks like the script doesn't allocate right memory to driver in my case? Regards, Shuai
Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
Jeetendra: Please extract the information you need from Result and return the extracted portion - instead of returning Result itself. Cheers On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu zhunanmcg...@gmail.com wrote: The example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala might help Best, -- Nan Zhu http://codingcat.me On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote: Yep, it's not serializable: https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html You can't return this from a distributed operation since that would mean it has to travel over the network and you haven't supplied any way to convert the thing into bytes. On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele gangele...@gmail.com wrote: When I am trying to get the result from Hbase and running mapToPair function of RRD its giving the error java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result Here is the code // private static JavaPairRDDInteger, Result getCompanyDataRDD(JavaSparkContext sc) throws IOException { // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(), TableInputFormat.class, ImmutableBytesWritable.class, // Result.class).mapToPair(new PairFunctionTuple2ImmutableBytesWritable, Result, Integer, Result() { // // public Tuple2Integer, Result call(Tuple2ImmutableBytesWritable, Result t) throws Exception { // System.out.println(In getCompanyDataRDD+t._2); // // String cknid = Bytes.toString(t._1.get()); // System.out.println(processing cknids is:+cknid); // Integer cknidInt = Integer.parseInt(cknid); // Tuple2Integer, Result returnTuple = new Tuple2Integer, Result(cknidInt, t._2); // return returnTuple; // } // }); // } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Query REST web service with Spark?
We have have some data on Hadoop that needs augmented with data only available to us via a REST service. We're using Spark to search for, and correct, missing data. Even though there are a lot of records to scour for missing data, the total number of calls to the service is expected to be low, so it would be ideal to do the whole job in Spark as we scour the data. I don't see anything obvious in the API or on Google relating to making REST calls from a Spark job. Is it possible? Thanks, Alec
spark.sql.Row manipulation
I have 2 paraquet files with format e.g name , age, town I read them and then join them to get all the names which are in both towns . the resultant dataset is res4: Array[org.apache.spark.sql.Row] = Array([name1, age1, town1,name2,age2,town2]) Name 1 and name 2 are same as I am joining . Now , I want to get only to the format (name , age1, age2) But I cant seem to getting to manipulate the spark.sql.row. Trying something like map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) does not work . Can you suggest a way ? Thanks -R
Re: How to configure SparkUI to use internal ec2 ip
Thanks Petar and Akhil for the suggestion. Actually I changed the SPARK_MASTER_IP to the internal-ip, deleted the export SPARK_PUBLIC_DNS=xx line in the spark-env.sh and also edited the /etc/hosts as Akhil suggested, and now it is working! However I don't know which change actually makes it work. Thanks! Anny On Tue, Mar 31, 2015 at 10:26 AM, Petar Zecevic petar.zece...@gmail.com wrote: Did you try setting the SPARK_MASTER_IP parameter in spark-env.sh? On 31.3.2015. 19:19, Anny Chen wrote: Hi Akhil, I tried editing the /etc/hosts on the master and on the workers, and seems it is not working for me. I tried adding hostname internal-ip and it didn't work. I then tried adding internal-ip hostname and it didn't work either. I guess I should also edit the spark-env.sh file? Thanks! Anny On Mon, Mar 30, 2015 at 11:15 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can add an internal ip to public hostname mapping in your /etc/hosts file, if your forwarding is proper then it wouldn't be a problem there after. Thanks Best Regards On Tue, Mar 31, 2015 at 9:18 AM, anny9699 anny9...@gmail.com wrote: Hi, For security reasons, we added a server between my aws Spark Cluster and local, so I couldn't connect to the cluster directly. To see the SparkUI and its related work's stdout and stderr, I used dynamic forwarding and configured the SOCKS proxy. Now I could see the SparkUI using the internal ec2 ip, however when I click on the application UI (4040) or the worker's UI (8081), it still automatically uses the public DNS instead of internal ec2 ip, which the browser now couldn't show. Is there a way that I could configure this? I saw that one could configure the LOCAL_ADDRESS_IP in the spark-env.sh, but not sure whether this could help. Does anyone experience the same issue? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-SparkUI-to-use-internal-ec2-ip-tp22311.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org