For #2, have you checked task log(s) to see if there was some clue ?

You may want to use foreachPartition to reduce the number of flushes.

In the future, please remove color coding - it is not easy to read.

Cheers

On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu <lujinho...@gmail.com> wrote:

> Hi, Ted
>
> thanks for your help.
>
> I check the jar, it is in classpath, and now the problem is :
>
> 1、 Follow codes runs good, and it put the  result to hbse:
>
>   val res = 
> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>  TrainFeature())(seqOp, combOp).values.first()
>  val configuration = HBaseConfiguration.create();
>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>   configuration.set("hbase.master", "192.168.1.66:60000");
>   val table = new HTable(configuration, "ljh_test3");
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
> Bytes.toBytes(res.positiveCount));
>   table.put(put);
>   table.flushCommits()
>
> 2、But if I change the first() function to foreach:
>
>   
> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>  TrainFeature())(seqOp, combOp).values.foreach({res=>
>   val configuration = HBaseConfiguration.create();
>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>   configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>   configuration.set("hbase.master", "192.168.1.66:60000");
>   val table = new HTable(configuration, "ljh_test3");
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
> Bytes.toBytes(res.positiveCount));
>   table.put(put);
>
> })
>
> the application hung, and the last log is :
>
> 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: List()
> 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 
> (MapPartitionsRDD[6] at values at TrainModel3.scala:98), which is now runnable
> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with 
> curMem=264045, maxMem=278302556
> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in 
> memory (estimated size 6.9 KB, free 265.2 MB)
> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with 
> curMem=271077, maxMem=278302556
> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes 
> in memory (estimated size 3.4 KB, free 265.1 MB)
> 15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory 
> on 10.120.69.53:43019 (size: 3.4 KB, free: 265.4 MB)
> 15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at 
> DAGScheduler.scala:874
> 15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from 
> ResultStage 2 (MapPartitionsRDD[6] at values at TrainModel3.scala:98)
> 15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks
> 15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, 
> gdc-dn147-formal.i.nease.net, PROCESS_LOCAL, 1716 bytes)
> 15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory 
> on gdc-dn147-formal.i.nease.net:59814 (size: 3.4 KB, free: 1060.3 MB)
> 15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map 
> output locations for shuffle 0 to gdc-dn147-formal.i.nease.net:52904
> 15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for 
> shuffle 0 is 154 bytes
>
> 3、besides, I take the configuration and HTable out of foreach:
>
> val configuration = HBaseConfiguration.create();
> configuration.set("hbase.zookeeper.property.clientPort", "2181");
> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
> configuration.set("hbase.master", "192.168.1.66:60000");
> val table = new HTable(configuration, "ljh_test3");
>
> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>  TrainFeature())(seqOp, combOp).values.foreach({ res =>
>
>   var put = new Put(Bytes.toBytes(res.toKey()));
>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
> Bytes.toBytes(res.positiveCount));
>   table.put(put);
>
> })
> table.flushCommits()
>
> found serializable problem:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not 
> serializable
>         at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>         at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>         at 
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
>         at org.apache.spark.rdd.RDD
> $$anonfun$foreach$1.apply(RDD.scala:869)
>               at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
>         at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100)
>         at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115)
>         at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>         at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: 
> org.apache.hadoop.conf.Configuration
> Serialization stack:
>         - object not serializable (class: 
> org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, 
> core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, 
> yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, 
> hbase-site.xml)
>         - field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, 
> name: configuration$1, type: class org.apache.hadoop.conf.Configuration)
>               - object (class 
> com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>)
>         at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>         at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>         at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>         ... 21 more
>
>
>
>
>
> 在 2015年10月28日,09:26,Ted Yu <yuzhih...@gmail.com> 写道:
>
> Jinghong:
> In one of earlier threads on storing data to hbase, it was found that
> htrace jar was not on classpath, leading to write failure.
>
> Can you check whether you are facing the same problem ?
>
> Cheers
>
> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Jinghong:
>> Hadmin variable is not used. You can omit that line.
>>
>> Which hbase release are you using ?
>>
>> As Deng said, don't flush per row.
>>
>> Cheers
>>
>> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <och...@apache.org>
>> wrote:
>>
>> Hi,
>>
>> It would be more efficient if you configure the table and flush the
>> commits by partition instead of per element in the RDD. The latter works
>> fine because you only have 4 elements, but it won't bid well for large data
>> sets IMO..
>>
>> Thanks,
>> Deng
>>
>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinho...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I write my result to hdfs, it did well:
>>>
>>> val model = 
>>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>>  TrainFeature())(seqOp, combOp).values
>>>  model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + 
>>> a.positiveCount)).saveAsTextFile(modelDataPath);
>>>
>>> But when I want to write to hbase, the applicaton hung, no log, no
>>> response, just stay there, and nothing is written to hbase:
>>>
>>> val model = 
>>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new
>>>  TrainFeature())(seqOp, combOp).values.foreach({ res =>
>>>   val configuration = HBaseConfiguration.create();
>>>   configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>>   configuration.set("hbase.zookeeper.quorum", “192.168.1.66");
>>>   configuration.set("hbase.master", "192.168.1:60000");
>>>   val hadmin = new HBaseAdmin(configuration);
>>>   val table = new HTable(configuration, "ljh_test3");
>>>   var put = new Put(Bytes.toBytes(res.toKey()));
>>>   put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), 
>>> Bytes.toBytes(res.totalCount + res.positiveCount));
>>>   table.put(put);
>>>   table.flushCommits()
>>> })
>>>
>>> And then I try to write som simple data to hbase, it did well too:
>>>
>>> sc.parallelize(Array(1,2,3,4)).foreach({ res =>
>>> val configuration = HBaseConfiguration.create();
>>> configuration.set("hbase.zookeeper.property.clientPort", "2181");
>>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66");
>>> configuration.set("hbase.master", "192.168.1:60000");
>>> val hadmin = new HBaseAdmin(configuration);
>>> val table = new HTable(configuration, "ljh_test3");
>>> var put = new Put(Bytes.toBytes(res));
>>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res));
>>> table.put(put);
>>> table.flushCommits()
>>> })
>>>
>>> what is the problem with the 2rd code? thanks a lot.
>>>
>>>
>
>

Reply via email to