Re: how to retain part of the features in LogisticRegressionModel (spark2.0)

2017-03-19 Thread jinhong lu
Thanks Dhanesh,  and how about the features question?

> 在 2017年3月19日,19:08,Dhanesh Padmanabhan  写道:
> 
> Dhanesh

Thanks,
lujinhong



Re: how to retain part of the features in LogisticRegressionModel (spark2.0)

2017-03-19 Thread jinhong lu
By the way, I found in spark 2.1 I can use setFamily() to decide binomial or 
multinomial, but how  can I do the same thing in spark 2.0.2?
If  not support , which one is used in spark 2.0.2?  binomial or multinomial?

> 在 2017年3月19日,18:12,jinhong lu  写道:
> 
> 
> I train my LogisticRegressionModel like this,  I want my model to retain only 
> some of the features(e.g. 500 of them), not all the  features. What shou 
> I do? 
> I use .setElasticNetParam(1.0), but still all the features is in 
> lrModel.coefficients.
> 
> import org.apache.spark.ml.classification.LogisticRegression
> val 
> data=spark.read.format("libsvm").option("numFeatures","").load("/tmp/data/training_data3")
>  
> val Array(trainingData, testData) = data.randomSplit(Array(0.5, 0.5), 
> seed = 1234L)
> 
> val lr = new LogisticRegression()
> val lrModel = lr.fit(trainingData)
> println(s"Coefficients: ${lrModel.coefficients} Intercept: 
> ${lrModel.intercept}")
> 
> val predictions = lrModel.transform(testData)
> predictions.show()
> 
> 
> Thanks, 
> lujinhong
> 

Thanks,
lujinhong



how to retain part of the features in LogisticRegressionModel (spark2.0)

2017-03-19 Thread jinhong lu

I train my LogisticRegressionModel like this,  I want my model to retain only 
some of the features(e.g. 500 of them), not all the  features. What shou I 
do? 
I use .setElasticNetParam(1.0), but still all the features is in 
lrModel.coefficients.

  import org.apache.spark.ml.classification.LogisticRegression
  val 
data=spark.read.format("libsvm").option("numFeatures","").load("/tmp/data/training_data3")
 
  val Array(trainingData, testData) = data.randomSplit(Array(0.5, 0.5), 
seed = 1234L)

  val lr = new LogisticRegression()
  val lrModel = lr.fit(trainingData)
  println(s"Coefficients: ${lrModel.coefficients} Intercept: 
${lrModel.intercept}")

  val predictions = lrModel.transform(testData)
  predictions.show()


Thanks, 
lujinhong



Re: how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
Anyone help?

> 在 2017年3月13日,19:38,jinhong lu  写道:
> 
> After train the mode, I got the result look like this:
> 
> 
>   scala>  predictionResult.show()
>   
> +-++++--+
>   |label|features|   rawPrediction| 
> probability|prediction|
>   
> +-++++--+
>   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
>  0.0|
>   |  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
>  0.0|
>   |  0.0|(144109,[100],[24...|[-146.81612388602...|[9.73704654529197...|  
>  1.0|
> 
> And then, I transform() the data by these code:
> 
>   import org.apache.spark.ml.linalg.Vectors
>   import org.apache.spark.ml.linalg.Vector
>   import scala.collection.mutable
> 
>  def lineToVector(line:String ):Vector={
>   val seq = new mutable.Queue[(Int,Double)]
>   val content = line.split(" ");
>   for( s <- content){
> val index = s.split(":")(0).toInt
> val value = s.split(":")(1).toDouble
>  seq += ((index,value))
>   }
>   return Vectors.sparse(144109, seq)
> }
> 
>val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable, 
> org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line
>  => 
> (line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1.toDF("udid",
>  "features")
>val predictionResult = model.transform(df)
>predictionResult.show()
> 
> 
> But I got the error look like this:
> 
> Caused by: java.lang.IllegalArgumentException: requirement failed: You may 
> not write an element to index 804201 because the declared size of your vector 
> is 144109
>  at scala.Predef$.require(Predef.scala:224)
>  at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)
>  at lineToVector(:55)
>  at $anonfun$4.apply(:50)
>  at $anonfun$4.apply(:50)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:84)
>  at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 
> So I change
> 
>   return Vectors.sparse(144109, seq)
> 
> to 
> 
>   return Vectors.sparse(804202, seq)
> 
> Another error occurs:
> 
>   Caused by: java.lang.IllegalArgumentException: requirement failed: The 
> columns of A don't match the number of elements of x. A: 144109, x: 804202
> at scala.Predef$.require(Predef.scala:224)
> at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)
> at 
> org.apache.spark.ml.linalg.Matrix$class.multiply(Matrices.scala:110)
> at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.scala:176)
> 
> what should I do?
>> 在 2017年3月13日,16:31,jinhong lu  写道:
>> 
>> Hi, all:
>> 
>> I got these training data:
>> 
>>  0 31607:17
>>  0 111905:36
>>  0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
>> 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
>> 112109:4 123305:48 142509:1
>>  0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
>>  0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
>> 31607:19
>>  0 19109:7 29705:4 123305:32
>>  0 15309:1 43005:1 108509:1
>>  1 604:1 6401:1 6503:1 15207:4 31607:40
>>  0 1807:19
>>  0 301:14 501:1 1502:14 2507:12 123305:4
>>  0 607:14 19109:460 123305:448
>>  0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
>> 128209:1
>>  1 1606:1 2306:3 3905:19 4408:3

Re: how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
After train the mode, I got the result look like this:


scala>  predictionResult.show()

+-++++--+
|label|features|   rawPrediction| 
probability|prediction|

+-++++--+
|  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
 0.0|
|  0.0|(144109,[100],[2.0])|[-12.246737725034...|[0.96061209556737...|  
 0.0|
|  0.0|(144109,[100],[24...|[-146.81612388602...|[9.73704654529197...|  
 1.0|

And then, I transform() the data by these code:

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import scala.collection.mutable

   def lineToVector(line:String ):Vector={
val seq = new mutable.Queue[(Int,Double)]
val content = line.split(" ");
for( s <- content){
  val index = s.split(":")(0).toInt
  val value = s.split(":")(1).toDouble
   seq += ((index,value))
}
return Vectors.sparse(144109, seq)
  }

 val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable, 
org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line
 => 
(line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1.toDF("udid",
 "features")
 val predictionResult = model.transform(df)
 predictionResult.show()


But I got the error look like this:

 Caused by: java.lang.IllegalArgumentException: requirement failed: You may not 
write an element to index 804201 because the declared size of your vector is 
144109
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)
  at lineToVector(:55)
  at $anonfun$4.apply(:50)
  at $anonfun$4.apply(:50)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:84)
  at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)

So I change

return Vectors.sparse(144109, seq)

to 

return Vectors.sparse(804202, seq)

Another error occurs:

Caused by: java.lang.IllegalArgumentException: requirement failed: The 
columns of A don't match the number of elements of x. A: 144109, x: 804202
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)
  at 
org.apache.spark.ml.linalg.Matrix$class.multiply(Matrices.scala:110)
  at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.scala:176)

what should I do?
> 在 2017年3月13日,16:31,jinhong lu  写道:
> 
> Hi, all:
> 
> I got these training data:
> 
>   0 31607:17
>   0 111905:36
>   0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
> 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
> 112109:4 123305:48 142509:1
>   0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
>   0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
> 31607:19
>   0 19109:7 29705:4 123305:32
>   0 15309:1 43005:1 108509:1
>   1 604:1 6401:1 6503:1 15207:4 31607:40
>   0 1807:19
>   0 301:14 501:1 1502:14 2507:12 123305:4
>   0 607:14 19109:460 123305:448
>   0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
> 128209:1
>   1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 
> 27709:2 56509:8 122705:62 123305:31 124005:2
> 
> And then I train the model by spark:
> 
>   import org.apache.spark.ml.classification.NaiveBayes
>   import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
>   import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
>   import org.apache.spark.sql.SparkSession
> 
>   val spark = 
> SparkSession.builder.appName("NaiveBayesExampl

how to construct parameter for model.transform() from datafile

2017-03-13 Thread jinhong lu
Hi, all:

I got these training data:

0 31607:17
0 111905:36
0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 
30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 
112109:4 123305:48 142509:1
0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10
0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 
31607:19
0 19109:7 29705:4 123305:32
0 15309:1 43005:1 108509:1
1 604:1 6401:1 6503:1 15207:4 31607:40
0 1807:19
0 301:14 501:1 1502:14 2507:12 123305:4
0 607:14 19109:460 123305:448
0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 
128209:1
1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 
27709:2 56509:8 122705:62 123305:31 124005:2

And then I train the model by spark:

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.SparkSession

val spark = 
SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
val data = 
spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), 
seed = 1234L)
//val model = new NaiveBayes().fit(trainingData)
val model = new 
NaiveBayes().setThresholds(Array(10.0,1.0)).fit(trainingData)
val predictions = model.transform(testData)
predictions.show()


OK, I have got my model by the cole above, but how can I use this model to 
predict the classfication of other data like these:

ID1 509:2 5102:4 25909:1 31709:4 121905:19
ID2 800201:1
ID3 116005:4
ID4 800201:1
ID5 19109:1  21708:1 23208:1 49809:1 88609:1
ID6 800201:1
ID7 43505:7 106405:7

I know I can use the transform() method, but how to contrust the parameter for 
transform() method?





Thanks,
lujinhong


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ml package data types

2017-03-09 Thread jinhong lu
Hi, 

Is there any documentation for ml package data types? 
just like the mllib package here : 
https://spark.apache.org/docs/latest/mllib-data-types.html 


Or it is the same  for ml and mllib?

Thanks,
lujinhong



mllib based on dataset or dataframe

2016-07-10 Thread jinhong lu
Hi,
Since the DataSet will be the major API in spark2.0,  why mllib will 
DataFrame-based, and 'future development will focus on the DataFrame-based API.’

   Any plan will change mllib form DataFrame-based to DataSet-based?


=
Thanks,
lujinhong


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark to hbase

2015-10-27 Thread jinhong lu
I write a demo, but still no response, no error, no log.

My hbase is 0.98, hadoop 2.3, spark 1.4.

And I run in yarn-client mode. any idea? thanks.


package com.lujinhong.sparkdemo

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

object SparkConnectHbase2 extends Serializable {

  def main(args: Array[String]) {
new SparkConnectHbase2().toHbase();
  }

}

class SparkConnectHbase2 extends Serializable {

  def toHbase() {
val conf = new SparkConf().setAppName("ljh_ml3");
val sc = new SparkContext(conf)

val tmp = sc.parallelize(Array(601, 701, 801, 901)).foreachPartition({ a => 
  val configuration = HBaseConfiguration.create();
  configuration.set("hbase.zookeeper.property.clientPort", "2181");
  configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
  configuration.set("hbase.master", “192.168.1.66:6");
  val table = new HTable(configuration, "ljh_test4");
  var put = new Put(Bytes.toBytes(a+""));
  put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(a + 
"value"));
  table.put(put);
  table.flushCommits();
})

  }

}


> 在 2015年10月28日,10:23,Fengdong Yu  写道:
> 
> Also, please remove the HBase related to the Scala Object, this will resolve 
> the serialize issue and avoid open connection repeatedly.
> 
> and remember close the table after the final flush.
> 
> 
> 
>> On Oct 28, 2015, at 10:13 AM, Ted Yu > <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> For #2, have you checked task log(s) to see if there was some clue ?
>> 
>> You may want to use foreachPartition to reduce the number of flushes.
>> 
>> In the future, please remove color coding - it is not easy to read.
>> 
>> Cheers
>> 
>> On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu > <mailto:lujinho...@gmail.com>> wrote:
>> Hi, Ted
>> 
>> thanks for your help.
>> 
>> I check the jar, it is in classpath, and now the problem is :
>> 
>> 1、 Follow codes runs good, and it put the  result to hbse:
>> 
>>   val res = 
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>  TrainFeature())(seqOp, combOp).values.first()
>>  val configuration = HBaseConfiguration.create();
>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>   configuration.set("hbase.master", "192.168.1.66:6 
>> <http://192.168.1.66:6/>");
>>   val table = new HTable(configuration, "ljh_test3");
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
>> Bytes.toBytes(res.positiveCount));
>>   table.put(put);
>>   table.flushCommits()
>> 
>> 2、But if I change the first() function to foreach:
>> 
>>   
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>  TrainFeature())(seqOp, combOp).values.foreach({res=>
>>   val configuration = HBaseConfiguration.create();
>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>   configuration.set("hbase.master", "192.168.1.66:6 
>> <http://192.168.1.66:6/>");
>>   val table = new HTable(configuration, "ljh_test3");
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
>> Bytes.toBytes(res.positiveCount));
>>   table.put(put);
>> 
>> })
>> 
>> the application hung, and the last log is :
>> 
>> 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: 
>> List()
>> 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 
>> (MapPartitionsRDD[6] at values at TrainModel3.scala:98), which is now 
>> runnable
>> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with 
>> curMem=264045, maxMem=278302556
>> 15/10/28 09:30:33 INFO MemoryStore: Block broad

Re: spark to hbase

2015-10-27 Thread jinhong lu
ala:868)
at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100)
at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115)
at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: 
org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, 
value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, 
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, 
hdfs-site.xml, hbase-default.xml, hbase-site.xml)
- field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, 
name: configuration$1, type: class org.apache.hadoop.conf.Configuration)
- object (class 
com.chencai.spark.ml.TrainModel3$$anonfun$train$5, )
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more




> 在 2015年10月28日,09:26,Ted Yu  写道:
> 
> Jinghong:
> In one of earlier threads on storing data to hbase, it was found that htrace 
> jar was not on classpath, leading to write failure.
> 
> Can you check whether you are facing the same problem ?
> 
> Cheers
> 
> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu  <mailto:yuzhih...@gmail.com>> wrote:
> Jinghong:
> Hadmin variable is not used. You can omit that line. 
> 
> Which hbase release are you using ?
> 
> As Deng said, don't flush per row. 
> 
> Cheers
> 
> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete  <mailto:och...@apache.org>> wrote:
> 
>> Hi,
>> 
>> It would be more efficient if you configure the table and flush the commits 
>> by partition instead of per element in the RDD. The latter works fine 
>> because you only have 4 elements, but it won't bid well for large data sets 
>> IMO..
>> 
>> Thanks,
>> Deng
>> 
>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu > <mailto:lujinho...@gmail.com>> wrote:
>> 
>> 
>> Hi, 
>> 
>> I write my result to hdfs, it did well:
>> 
>> val model = 
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>  TrainFeature())(seqOp, combOp).values
>>  model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + 
>> a.positiveCount)).saveAsTextFile(modelDataPath);
>> 
>> But when I want to write to hbase, the applicaton hung, no log, no response, 
>> just stay there, and nothing is written to hbase:
>> 
>> val model = 
>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>  TrainFeature())(seqOp, combOp).values.foreach({ res =>
>>   val configuration = HBaseConfiguration.create();
>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>   configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>>   configuration.set("hbase.master", "192.168.1:6");
>>   val hadmin = new HBaseAdmin(configuration);
>>   val table = new HTable(configuration, "ljh_test3");
>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
>> Bytes.toBytes(res.totalCount + res.positiveCount));
>>   table.put(put);
>>   table.flushCommits()
>> })
>> 
>> And then I try to write som simple data to hbase, it did well too:
>> 
>> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
>> val configuration = HBaseConfiguration.create();
>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>> configuration.set("hbase.master", "192.168.1:6");
>> val hadmin = new HBaseAdmin(configuration);
>> val table = new HTable(configuration, "ljh_test3");
>> var put = new Put(Bytes.toBytes(res));
>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
>> table.put(put);
>> table.flushCommits()
>> })
>> 
>> what is the problem with the 2rd code? thanks a lot.
>> 
>> 
> 



spark to hbase

2015-10-27 Thread jinhong lu

Hi, 

I write my result to hdfs, it did well:

val model = 
lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
 TrainFeature())(seqOp, combOp).values
 model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + 
a.positiveCount)).saveAsTextFile(modelDataPath);

But when I want to write to hbase, the applicaton hung, no log, no response, 
just stay there, and nothing is written to hbase:

val model = 
lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
 TrainFeature())(seqOp, combOp).values.foreach({ res =>
  val configuration = HBaseConfiguration.create();
  configuration.set("hbase.zookeeper.property.clientPort", "2181");
  configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
  configuration.set("hbase.master", "192.168.1:6");
  val hadmin = new HBaseAdmin(configuration);
  val table = new HTable(configuration, "ljh_test3");
  var put = new Put(Bytes.toBytes(res.toKey()));
  put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.totalCount 
+ res.positiveCount));
  table.put(put);
  table.flushCommits()
})

And then I try to write som simple data to hbase, it did well too:

sc.parallelize(Array(1,2,3,4)).foreach({ res =>
val configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
configuration.set("hbase.master", "192.168.1:6");
val hadmin = new HBaseAdmin(configuration);
val table = new HTable(configuration, "ljh_test3");
var put = new Put(Bytes.toBytes(res));
put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
table.put(put);
table.flushCommits()
})

what is the problem with the 2rd code? thanks a lot.






NoSuchMethodException : com.google.common.io.ByteStreams.limit

2015-10-23 Thread jinhong lu
Hi, I run spark to write data to hbase, but found NoSuchMethodException:

15/10/23 18:45:21 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
dn18-formal.i.nease.net): java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;

I found guava.jar in hadoop/hbase dir and the version is 12.0, but 
com.google.common.io.ByteStreams.limit is since 14.0, so NoSuchMethodException 
occurs.

I try to run spark-submmit by - -jars,but the same. and I try to add  
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
to my code, still the same.

How to solve this? How to remove the guava.jar in hadoop/hbase from class path? 
why it does not use the guava.jar in spark dir.

Here is my code:

rdd.foreach({ res =>
  val configuration = HBaseConfiguration.create();

  configuration.set("hbase.zookeeper.property.clientPort", "2181");
  configuration.set("hbase.zookeeper.quorum", “ip.66");
  configuration.set("hbase.master", “ip:6");
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
  val hadmin = new HBaseAdmin(configuration);
  configuration.clear();
  configuration.addResource("/home/hadoop/conf/core-default.xml")
  configuration.addResource("/home/hadoop/conf/core-site.xml")
  configuration.addResource("/home/hadoop/conf/mapred-default.xml")
  configuration.addResource("/home/hadoop/conf/mapred-site.xml")
  configuration.addResource("/home/hadoop/conf/yarn-default.xml")
  configuration.addResource("/home/hadoop/conf/yarn-site.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-default.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-site.xml")
  configuration.addResource("/home/hadoop/conf/hbase-default.xml")
  configuration.addResource("/home/ljhn1829/hbase-site.xml")
  val table = new HTable(configuration, "ljh_test2");
  var put = new Put(Bytes.toBytes(res.toKey()));
  put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), 
Bytes.toBytes(res.totalCount + "\t" + res.positiveCount));
  table.put(put);
  table.flushCommits()
})

and the error message:


15/10/23 19:06:42 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
gdc-dn126-formal.i.nease.net): java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.nextBatchStream(ExternalAppendOnlyMap.scala:420)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:392)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:207)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:83)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/10/23 19:06:42 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 
gdc-dn166-formal.i.nease.net, PROCESS_LOCAL, 1277 bytes)
15/10/23 19:06:42 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
gdc-dn166-formal.i.nease.net:3838 (size: 3.2 KB, free: 1060.3 MB)
15/10/23 19:06:42 ERROR YarnScheduler: Lost executor 1 on 
gdc-dn126-formal.i.nease.net: remote Rpc client disassociated
15/10/23 19:06:42 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkexecu...@gdc-dn126-formal.i.nease.net:1656] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].
15/10/23 19:06:42 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0
15/10/23 1