Re:Re: Serialization issue when using HBase with Spark
Thanks All. Finally the works code is below: object PlayRecord { def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, timeStop: Long, cacheSize: Int, filterSongDays: Int, filterPlaylistDays: Int): RDD[(String, (Int, Set[Long], Set[Long]))] = { accounts.mapPartitions(iterator = { if (iterator.nonEmpty) { val conf = HBaseConfiguration.create() val table = new HTable(conf, user_action) val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE) filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(stat), Bytes.toBytes(song_id), CompareOp.EQUAL, new RegexStringComparator(^\\d+$))) filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(stat), Bytes.toBytes(module), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(displayed iterator.map(id = { val scan = new Scan() scan.setCaching(cacheSize) scan.addFamily(Bytes.toBytes(stat)) scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(module)) scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(song_id)) scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(playlist_ids)) scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(time)) val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop) scan.setStartRow(rowKeyRange._1) scan.setStopRow(rowKeyRange._2) scan.setFilter(filterList) val userData = table.getScanner(scan).iterator().asScala.map(r = { val module = Bytes.toString(r.getValue(Bytes.toBytes(stat), Bytes.toBytes(module))) val time = Bytes.toLong(r.getValue(Bytes.toBytes(stat), Bytes.toBytes(time))) module match { case listen = val songId = Bytes.toString(r.getValue(Bytes.toBytes(stat), Bytes.toBytes(song_id))) (module, (time / DAY_MILLIS, songId)) case displayed = val playlistIds = Bytes.toString(r.getValue(Bytes.toBytes(stat), Bytes.toBytes(playlist_ids))) (module, (time / DAY_MILLIS, playlistIds)) case _ = (module, (0L, )) } }).toList.groupBy(_._1) val playRecordData = userData.get(listen) val playRecords = if (playRecordData.nonEmpty) playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet else Set[Long]() val playlistDisPlayData = userData.get(displayed) val playlistRecords = if (playlistDisPlayData.nonEmpty) playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet else Set[Long]() val result = (id, (idType, playRecords, playlistRecords)) if (!iterator.hasNext) { table.close() } result }) } else { iterator.map(id = { (id, (idType, Set[Long](), Set[Long]())) }) } }) } } As Shixiong mentioned Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/, I close table when iterator.hasNext is false, otherwise the application will be hung. And there is also another interesting project http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will try it later. At 2014-12-15 17:52:47, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: The reason not using sc.newAPIHadoopRDD is it only support one scan each time. I am not sure is that's true. You can use multiple scans as following: val scanStrings = scans.map(scan = convertScanToString(scan)) conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*) where convertScanToString is implemented as: /** * Serializes a HBase scan into string. * @param scan Scan to serialize. * @return Base64 encoded serialized scan. */ private def convertScanToString(scan: Scan) = { val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } Thanks, Aniket On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote: Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section setup() and cleanup() in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input
Serialization issue when using HBase with Spark
The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Native library can not be loaded when using Mllib PCA
Thanks Xiangrui, I switched to a Ubuntu 14.04 server and it works after install liblapack3gf and libopenblas-base. So it is a environment problem which is not related to Mllib. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042p7113.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Native library can not be loaded when using Mllib PCA
Hi, We're using Mllib (1.0.0 release version) on a k-means clustering problem. We want to reduce the matrix column size before send the points to k-means solver. It works on my mac with the local mode: spark-test-run-assembly-1.0.jar contains my application code, com.github.fommil, netlib code and netlib-native*.so files (include jnilib and dll files) spark-submit --class test.TestMllibPCA --master local[4] --executor-memory 3g --driver-memory 3g --driver-class-path /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/user_fav_2014_04_09.csv.head1w But if --driver-class-path removed, the warn message appears: 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can also solve the problem. The matrix contain sparse data with rows: 6778, columns: 2487 and the time consume of calculating PCA is 10s and 47s respectively which infers the native library works well. Then I want to test it on a spark standalone cluster(on CentOS), but it failed again. After change JDK logging level to FINEST, got the message: 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_ref-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class com.github.fommil.netlib.F2jLAPACK The libgfortran ,atlas, blas, lapack and arpack are all installed and all of the .so files are located under /usr/lib64, spark.executor.extraLibraryPath is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good news. What should I try next? Is the native library need to be visible for driver and executor both? In local mode the problem seems to be a classpath problem, but for standalone and yarn mode it get more complex. A detail document is really helpful. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html Sent from the Apache Spark User List mailing list archive at Nabble.com.