Is there a way to clone a JavaRDD without persisting it
In my problem I have a number of intermediate JavaRDDs and would like to be able to look at their sizes without destroying the RDD for sibsequent processing. persist will do this but these are big and perisist seems expensive and I am unsure of which StorageLevel is needed, Is there a way to clone a JavaRDD or does anyong have good ideas on how to do this?
Re: Pyspark Error when broadcast numpy array
Yes, your broadcast should be about 300M, much smaller than 2G, I didn't read your post carefully. The broadcast in Python had been improved much since 1.1, I think it will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1? Davies On Tue, Nov 11, 2014 at 8:37 PM, bliuab bli...@cse.ust.hk wrote: Dear Liu: Thank you very much for your help. I will update that patch. By the way, as I have succeed to broadcast an array of size(30M) the log said that such array takes around 230MB memory. As a result, I think the numpy array that leads to error is much smaller than 2G. On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List] [hidden email] wrote: This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote: In spark-1.0.2, I have come across an error when I try to broadcast a quite large numpy array(with 35M dimension). The error information except the java.lang.NegativeArraySizeException error and details is listed below. Moreover, when broadcast a relatively smaller numpy array(30M dimension), everything works fine. And 30M dimension numpy array takes 230M memory which, in my opinion, not very large. As far as I have surveyed, it seems related with py4j. However, I have no idea how to fix this. I would be appreciated if I can get some hint. py4j.protocol.Py4JError: An error occurred while calling o23.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) - And the test code is a follows: conf = SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051') conf.set('spark.executor.memory', '4000m') conf.set('spark.akka.timeout', '10') conf.set('spark.ui.port','8081') conf.set('spark.cores.max','150') #conf.set('spark.rdd.compress', 'True') conf.set('spark.default.parallelism', '300') #configure the spark environment sc = SparkContext(conf=conf, batchSize=1) vec = np.random.rand(3500) a = sc.broadcast(vec) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html To unsubscribe from Pyspark Error when broadcast numpy array, click here. NAML -- My Homepage: www.cse.ust.hk/~bliuab MPhil student in Hong Kong University of Science and Technology. Clear Water Bay, Kowloon, Hong Kong. Profile at LinkedIn. View this message in context: Re: Pyspark Error when broadcast numpy array Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How did the RDD.union work
Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList) it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
Re: MLLIB usage: BLAS dependency warning
Could you try jar tf on the assembly jar and grep netlib-native_system-linux-x86_64.so? -Xiangrui On Tue, Nov 11, 2014 at 7:11 PM, jpl jlefe...@soe.ucsc.edu wrote: Hi, I am having trouble using the BLAS libs with the MLLib functions. I am using org.apache.spark.mllib.clustering.KMeans (on a single machine) and running the Spark-shell with the kmeans example code (from https://spark.apache.org/docs/latest/mllib-clustering.html) which runs successfully but I get the following warning in the log: WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS I compiled spark 1.1.0 with mvn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Pnetlib-lgpl -DskipTests clean package If anyone could please clarify the steps to get the dependencies correctly installed and visible to spark (from https://spark.apache.org/docs/latest/mllib-guide.html), that would be greatly appreciated. Using yum, I installed blas.x86_64, lapack.x86_64, gcc-gfortran.x86_64, libgfortran.x86_64 and then downloaded Breeze and built that successfully with Maven. I verified that I do have /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3 present on the machine and ldconf -p shows these listed. I also tried adding /usr/lib/ to spark.executor.extraLibraryPath and I verified it is present in the Spark webUI environment tab. I downloaded and compiled jblas with mvn clean install, which creates jblas-1.2.4-SNAPSHOT.jar, and then also tried adding that to spark.executor.extraClassPath but I still get the same WARN message. Maybe there are a few simple steps that I am missing? Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-usage-BLAS-dependency-warning-tp18660.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell exception while running in YARN mode
The Pi example gives same error in yarn mode HADOOP_CONF_DIR=/home/gs/conf/current ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client ../examples/target/spark-examples_2.10-1.2.0-SNAPSHOT.jar What could be wrong here? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-exception-while-running-in-YARN-mode-tp18679p18688.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How did the RDD.union work
Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com: Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig
Re: Imbalanced shuffle read
When you calls the groupByKey() try providing the number of partitions like groupByKey(100) depending on your data/cluster size. Thanks Best Regards On Wed, Nov 12, 2014 at 6:45 AM, ankits ankitso...@gmail.com wrote: Im running a job that uses groupByKey(), so it generates a lot of shuffle data. Then it processes this and writes files to HDFS in a forEachPartition block. Looking at the forEachPartition stage details in the web console, all but one executor is idle (SUCCESS in 50-60ms), and one is RUNNING with a huge shuffle read and takes a long time to finish. Can someone explain why the read is all on one node and how to parallelize this better? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复: How did the RDD.union work
ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
spark sql - save to Parquet file - Unsupported datatype TimestampType
Hi Friends, I am trying to save a json file to parquet. I got error Unsupported datatype TimestampType. Is not parquet support date? Which parquet version does spark uses? Is there any work around? Here the stacktrace: java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:319) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:151) at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:130) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:76) at org.apache.spark.sql.api.java.JavaSchemaRDD.saveAsParquetFile(JavaSchemaRDD.scala:42) Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-save-to-Parquet-file-Unsupported-datatype-TimestampType-tp18691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: groupBy for DStream
1. Use foreachRDD over the dstream and on the each rdd you can call the groupBy() 2. DStream.count() Return a new DStream in which each RDD has a single element generated by counting each RDD of this DStream. Thanks Best Regards On Wed, Nov 12, 2014 at 2:49 AM, SK skrishna...@gmail.com wrote: Hi. 1) I dont see a groupBy() method for a DStream object. Not sure why that is not supported. Currently I am using filter () to separate out the different groups. I would like to know if there is a way to convert a DStream object to a regular RDD so that I can apply the RDD methods like groupBy. 2) The count() method for a DStream object returns a DStream[Long] instead of a simple Long (like RDD does). How can I extract the simple Long count value? I tried dstream(0) but got a compilation error that it does not take parameters. I also tried dstream[0], but that also resulted in a compilation error. I am not able to use the head() or take(0) method for DStream either. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-for-DStream-tp18623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark Error when broadcast numpy array
Dear Liu: Thank you for your replay. I will set up an experimental environment for spark-1.1 and test it. On Wed, Nov 12, 2014 at 2:30 PM, Davies Liu-2 [via Apache Spark User List] ml-node+s1001560n1868...@n3.nabble.com wrote: Yes, your broadcast should be about 300M, much smaller than 2G, I didn't read your post carefully. The broadcast in Python had been improved much since 1.1, I think it will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1? Davies On Tue, Nov 11, 2014 at 8:37 PM, bliuab [hidden email] http://user/SendEmail.jtp?type=nodenode=18684i=0 wrote: Dear Liu: Thank you very much for your help. I will update that patch. By the way, as I have succeed to broadcast an array of size(30M) the log said that such array takes around 230MB memory. As a result, I think the numpy array that leads to error is much smaller than 2G. On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List] [hidden email] wrote: This PR fix the problem: https://github.com/apache/spark/pull/2659 cc @josh Davies On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote: In spark-1.0.2, I have come across an error when I try to broadcast a quite large numpy array(with 35M dimension). The error information except the java.lang.NegativeArraySizeException error and details is listed below. Moreover, when broadcast a relatively smaller numpy array(30M dimension), everything works fine. And 30M dimension numpy array takes 230M memory which, in my opinion, not very large. As far as I have surveyed, it seems related with py4j. However, I have no idea how to fix this. I would be appreciated if I can get some hint. py4j.protocol.Py4JError: An error occurred while calling o23.broadcast. Trace: java.lang.NegativeArraySizeException at py4j.Base64.decode(Base64.java:292) at py4j.Protocol.getBytes(Protocol.java:167) at py4j.Protocol.getObject(Protocol.java:276) at py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81) at py4j.commands.CallCommand.execute(CallCommand.java:77) at py4j.GatewayConnection.run(GatewayConnection.java:207) - And the test code is a follows: conf = SparkConf().setAppName('brodyliu_LR').setMaster('spark:// 10.231.131.87:5051') conf.set('spark.executor.memory', '4000m') conf.set('spark.akka.timeout', '10') conf.set('spark.ui.port','8081') conf.set('spark.cores.max','150') #conf.set('spark.rdd.compress', 'True') conf.set('spark.default.parallelism', '300') #configure the spark environment sc = SparkContext(conf=conf, batchSize=1) vec = np.random.rand(3500) a = sc.broadcast(vec) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html To unsubscribe from Pyspark Error when broadcast numpy array, click here. NAML -- My Homepage: www.cse.ust.hk/~bliuab MPhil student in Hong Kong University of Science and Technology. Clear Water Bay, Kowloon, Hong Kong. Profile at LinkedIn. View this message in context: Re: Pyspark Error when broadcast numpy array Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=18684i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=18684i=2 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18684.html To unsubscribe from Pyspark Error when broadcast numpy array, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=18662code=YmxpdWFiQGNzZS51c3QuaGt8MTg2NjJ8NTUwMDMxMjYz . NAML
回复: How did the RDD.union work
this work! but can you explain why should use like this? -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道: You need to create a new configuration for each RDD. Therefore, val hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration) Best Regards, Shixiong Zhu 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
Re: ISpark class not found
Hi, I was also trying Ispark..But I couldnt even start the notebook..I am getting the following error. ERROR:tornado.access:500 POST /api/sessions (127.0.0.1) 10.15ms referer=http://localhost:/notebooks/Scala/Untitled0.ipynb How did you start the notebook? Thanks Regards, Meethu M On Wednesday, 12 November 2014 6:50 AM, Laird, Benjamin benjamin.la...@capitalone.com wrote: I've been experimenting with the ISpark extension to IScala (https://github.com/tribbloid/ISpark) Objects created in the REPL are not being loaded correctly on worker nodes, leading to a ClassNotFound exception. This does work correctly in spark-shell. I was curious if anyone has used ISpark and has encountered this issue. Thanks! Simple example: In [1]: case class Circle(rad:Float) In [2]: val rdd = sc.parallelize(1 to 1).map(i=Circle(i.toFloat)).take(10)14/11/11 13:03:35 ERROR TaskResultGetter: Exception while getting task resultcom.esotericsoftware.kryo.KryoException: Unable to find class: [L$line5.$read$$iwC$$iwC$Circle; Full trace in my gist: https://gist.github.com/benjaminlaird/3e543a9a89fb499a3a14 The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
About Join operator in PySpark
Hi all I have noticed that “Join” operator has been transferred to union and groupByKey operator instead of cogroup operator in PySpark, this change will probably generate more shuffle stage, for example rdd1 = sc.makeRDD(...).partitionBy(2) rdd2 = sc.makeRDD(...).partitionBy(2) rdd3 = rdd1.join().collect() Above code implemented with scala will generate 2 shuffle, but will generate 3 shuffle with PySpark. what is initial design motivation of join operator in PySpark? Any idea to improve join performance in PySpark? Andrew
Re: Spark and Play
You can also build a Play 2.2.x + Spark 1.1.0 fat jar with sbt-assembly for, e.g. yarn-client support or using with spark-shell for debugging: play.Project.playScalaSettings libraryDependencies ~= { _ map { case m if m.organization == com.typesafe.play = m.exclude(commons-logging, commons-logging) case m = m }} assemblySettings test in assembly := {} mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.startsWith(META-INF) = MergeStrategy.discard case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first case PathList(org, slf4j, xs @ _*) = MergeStrategy.discard case about.html = MergeStrategy.rename case reference.conf = MergeStrategy.concat case _ = MergeStrategy.first } } On Tue, Nov 11, 2014 at 3:04 PM, Mohammed Guller moham...@glassbeam.com wrote: Actually, it is possible to integrate Spark 1.1.0 with Play 2.2.x Here is a sample build.sbt file: name := xyz version := 0.1 scalaVersion := 2.10.4 libraryDependencies ++= Seq( jdbc, anorm, cache, org.apache.spark %% spark-core % 1.1.0, com.typesafe.akka %% akka-actor % 2.2.3, com.typesafe.akka %% akka-slf4j % 2.2.3, org.apache.spark %% spark-sql % 1.1.0 ) play.Project.playScalaSettings Mohammed -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Tuesday, November 11, 2014 2:06 PM To: Akshat Aranya Cc: user@spark.apache.org Subject: Re: Spark and Play Hi There, Because Akka versions are not binary compatible with one another, it might not be possible to integrate Play with Spark 1.1.0. - Patrick On Tue, Nov 11, 2014 at 8:21 AM, Akshat Aranya aara...@gmail.com wrote: Hi, Sorry if this has been asked before; I didn't find a satisfactory answer when searching. How can I integrate a Play application with Spark? I'm getting into issues of akka-actor versions. Play 2.2.x uses akka-actor 2.0, whereas Play 2.3.x uses akka-actor 2.3.4, neither of which work fine with Spark 1.1.0. Is there something I should do with libraryDependencies in my build.sbt to make it work? Thanks, Akshat - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How did the RDD.union work
The `conf` object will be sent to other nodes via Broadcast. Here is the scaladoc of Broadcast: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). Best Regards, Shixiong Zhu 2014-11-12 15:20 GMT+08:00 qiaou qiaou8...@gmail.com: this work! but can you explain why should use like this? -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道: You need to create a new configuration for each RDD. Therefore, val hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration) Best Regards, Shixiong Zhu 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com: ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com: Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig
Re: Read a HDFS file from Spark source code
Hi Sean, I was following this link; http://mund-consulting.com/Blog/Posts/file-operations-in-HDFS-using-java.aspx But, I was facing FileSystem ambiguity error. I really don't have any idea as to how to go about doing this. Can you please help me how to start off with this? On Wed, Nov 12, 2014 at 11:26 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Instead of a file path, use a HDFS URI. For example: (In Python) data = sc.textFile(hdfs://localhost/user/someuser/data) On Wed, Nov 12, 2014 at 10:12 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi I am trying to access a file in HDFS from spark source code. Basically, I am tweaking the spark source code. I need to access a file in HDFS from the source code of the spark. I am really not understanding how to go about doing this. Can someone please help me out in this regard. Thank you!! Karthik