Re:Re: Serialization issue when using HBase with Spark

2014-12-23 Thread yangliuyu
Thanks All.


Finally the works code is below:


object PlayRecord {
  def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, 
timeStop: Long, cacheSize: Int,
 filterSongDays: Int, filterPlaylistDays: Int): 
RDD[(String, (Int, Set[Long], Set[Long]))] = {
accounts.mapPartitions(iterator = {
  if (iterator.nonEmpty) {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, user_action)
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(stat), 
Bytes.toBytes(song_id), CompareOp.EQUAL, new RegexStringComparator(^\\d+$)))
filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(stat), 
Bytes.toBytes(module), CompareOp.EQUAL, new 
BinaryComparator(Bytes.toBytes(displayed
iterator.map(id = {
  val scan = new Scan()
  scan.setCaching(cacheSize)
  scan.addFamily(Bytes.toBytes(stat))
  scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(module))
  scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(song_id))
  scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(playlist_ids))
  scan.addColumn(Bytes.toBytes(stat), Bytes.toBytes(time))
  val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop)
  scan.setStartRow(rowKeyRange._1)
  scan.setStopRow(rowKeyRange._2)
  scan.setFilter(filterList)
  val userData = table.getScanner(scan).iterator().asScala.map(r = {
val module = Bytes.toString(r.getValue(Bytes.toBytes(stat), 
Bytes.toBytes(module)))
val time = Bytes.toLong(r.getValue(Bytes.toBytes(stat), 
Bytes.toBytes(time)))
module match {
  case listen =
val songId = Bytes.toString(r.getValue(Bytes.toBytes(stat), 
Bytes.toBytes(song_id)))
(module, (time / DAY_MILLIS, songId))
  case displayed =
val playlistIds = 
Bytes.toString(r.getValue(Bytes.toBytes(stat), Bytes.toBytes(playlist_ids)))
(module, (time / DAY_MILLIS, playlistIds))
  case _ =
(module, (0L, ))
}
  }).toList.groupBy(_._1)
  val playRecordData = userData.get(listen)
  val playRecords = if (playRecordData.nonEmpty) 
playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet
  else Set[Long]()
  val playlistDisPlayData = userData.get(displayed)
  val playlistRecords = if (playlistDisPlayData.nonEmpty) 
playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet
  else Set[Long]()
  val result = (id, (idType, playRecords, playlistRecords))
  if (!iterator.hasNext) {
table.close()
  }
  result
})
  } else {
iterator.map(id = {
  (id, (idType, Set[Long](), Set[Long]()))
})
  }
})
  }
}


As Shixiong mentioned Sean Owen's post: 
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/,
 I close table when iterator.hasNext is false, otherwise the application will 
be hung. And there is also another interesting project  
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will 
try it later. 





At 2014-12-15 17:52:47, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote:
The reason not using sc.newAPIHadoopRDD is it only support one scan each time.



I am not sure is that's true. You can use multiple scans as following:


val scanStrings = scans.map(scan = convertScanToString(scan)) 
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)


where convertScanToString is implemented as:


/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}


Thanks,
Aniket


On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu zsxw...@gmail.com wrote:

Just point out a bug in your codes. You should not use `mapPartitions` like 
that. For details, I recommend Section setup() and cleanup() in Sean Owen's 
post: 
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/



Best Regards,

Shixiong Zhu


2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:In #1, class HTable can 
not be serializable.
You also need to check you self defined function getUserActions and make sure 
it is a member function of one class who implement serializable interface.

发自我的 iPad

 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:


 The scenario is using HTable instance to scan multiple rowkey range in Spark
 tasks look likes below:
 Option 1:
 val users = input

Serialization issue when using HBase with Spark

2014-12-12 Thread yangliuyu
The scenario is using HTable instance to scan multiple rowkey range in Spark
tasks look likes below:
Option 1:
val users = input
  .map { case (deviceId, uid) =
uid}.distinct().sortBy(x=x).mapPartitions(iterator={
  val conf = HBaseConfiguration.create()
  val table = new HTable(conf, actions)
  val result = iterator.map{ userId=
(userId, getUserActions(table, userId, timeStart, timeStop))
  }
  table.close()
  result
})

But got the exception:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
...
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

The reason not using sc.newAPIHadoopRDD is it only support one scan each
time.
val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]) 

And if using MultiTableInputFormat, driver is not possible put all rowkeys
into HBaseConfiguration
Option 2:
sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

It may divide all rowkey ranges into several parts then use option 2, but I
prefer option 1. So is there any solution for option 1? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Native library can not be loaded when using Mllib PCA

2014-06-06 Thread yangliuyu
Thanks Xiangrui,

I switched to a Ubuntu 14.04 server and it works after install liblapack3gf
and libopenblas-base.

So it is a environment problem which is not related to Mllib.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042p7113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Native library can not be loaded when using Mllib PCA

2014-06-05 Thread yangliuyu
Hi,

We're using Mllib (1.0.0 release version) on a k-means clustering problem.
We want to reduce the matrix column size before send the points to k-means
solver.

It works on my mac with the local mode: spark-test-run-assembly-1.0.jar
contains my application code, com.github.fommil, netlib code and
netlib-native*.so files (include jnilib and dll files) 

spark-submit --class test.TestMllibPCA --master local[4] --executor-memory
3g --driver-memory 3g --driver-class-path
/data/user/dump/spark-test-run-assembly-1.0.jar
/data/user/dump/spark-test-run-assembly-1.0.jar
/data/user/dump/user_fav_2014_04_09.csv.head1w 

But if  --driver-class-path removed, the warn message appears:
14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemLAPACK
14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeRefLAPACK

or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can
also solve the problem.

The matrix contain sparse data with rows: 6778, columns: 2487 and the time
consume of calculating PCA is 10s and 47s respectively which infers the
native library works well.

Then I want to test it on a spark standalone cluster(on CentOS), but it
failed again.
After change JDK logging level to FINEST, got the message:
14/06/05 16:19:15 INFO JniLoader: JNI LIB =
netlib-native_system-linux-x86_64.so
14/06/05 16:19:15 INFO JniLoader: extracting
jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so
to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so
14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemLAPACK
14/06/05 16:19:15 INFO JniLoader: JNI LIB =
netlib-native_ref-linux-x86_64.so
14/06/05 16:19:15 INFO JniLoader: extracting
jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so
to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so
14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeRefLAPACK
14/06/05 16:19:16 INFO LAPACK: Implementation provided by class
com.github.fommil.netlib.F2jLAPACK

The libgfortran ,atlas, blas, lapack and arpack are all installed and all of
the .so files are located under /usr/lib64, spark.executor.extraLibraryPath
is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I
tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good
news.

What should I try next?

Is the native library need to be visible for driver and executor both? In
local mode the problem seems to be a classpath problem, but for standalone
and yarn mode it get more complex. A detail document is really helpful.

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.