For #2, have you checked task log(s) to see if there was some clue ? You may want to use foreachPartition to reduce the number of flushes.
In the future, please remove color coding - it is not easy to read. Cheers On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu <lujinho...@gmail.com> wrote: > Hi, Ted > > thanks for your help. > > I check the jar, it is in classpath, and now the problem is : > > 1、 Follow codes runs good, and it put the result to hbse: > > val res = > lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new > TrainFeature())(seqOp, combOp).values.first() > val configuration = HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); > configuration.set("hbase.master", "192.168.1.66:60000"); > val table = new HTable(configuration, "ljh_test3"); > var put = new Put(Bytes.toBytes(res.toKey())); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), > Bytes.toBytes(res.positiveCount)); > table.put(put); > table.flushCommits() > > 2、But if I change the first() function to foreach: > > > lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new > TrainFeature())(seqOp, combOp).values.foreach({res=> > val configuration = HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); > configuration.set("hbase.master", "192.168.1.66:60000"); > val table = new HTable(configuration, "ljh_test3"); > var put = new Put(Bytes.toBytes(res.toKey())); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), > Bytes.toBytes(res.positiveCount)); > table.put(put); > > }) > > the application hung, and the last log is : > > 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: List() > 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 > (MapPartitionsRDD[6] at values at TrainModel3.scala:98), which is now runnable > 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with > curMem=264045, maxMem=278302556 > 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in > memory (estimated size 6.9 KB, free 265.2 MB) > 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with > curMem=271077, maxMem=278302556 > 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes > in memory (estimated size 3.4 KB, free 265.1 MB) > 15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on 10.120.69.53:43019 (size: 3.4 KB, free: 265.4 MB) > 15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at > DAGScheduler.scala:874 > 15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 2 (MapPartitionsRDD[6] at values at TrainModel3.scala:98) > 15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks > 15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, > gdc-dn147-formal.i.nease.net, PROCESS_LOCAL, 1716 bytes) > 15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on gdc-dn147-formal.i.nease.net:59814 (size: 3.4 KB, free: 1060.3 MB) > 15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map > output locations for shuffle 0 to gdc-dn147-formal.i.nease.net:52904 > 15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for > shuffle 0 is 154 bytes > > 3、besides, I take the configuration and HTable out of foreach: > > val configuration = HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); > configuration.set("hbase.master", "192.168.1.66:60000"); > val table = new HTable(configuration, "ljh_test3"); > > lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new > TrainFeature())(seqOp, combOp).values.foreach({ res => > > var put = new Put(Bytes.toBytes(res.toKey())); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), > Bytes.toBytes(res.positiveCount)); > table.put(put); > > }) > table.flushCommits() > > found serializable problem: > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) > at org.apache.spark.rdd.RDD > $$anonfun$foreach$1.apply(RDD.scala:869) > at > org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) > at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) > at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100) > at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115) > at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.io.NotSerializableException: > org.apache.hadoop.conf.Configuration > Serialization stack: > - object not serializable (class: > org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, > core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, > yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, > hbase-site.xml) > - field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, > name: configuration$1, type: class org.apache.hadoop.conf.Configuration) > - object (class > com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) > ... 21 more > > > > > > 在 2015年10月28日,09:26,Ted Yu <yuzhih...@gmail.com> 写道: > > Jinghong: > In one of earlier threads on storing data to hbase, it was found that > htrace jar was not on classpath, leading to write failure. > > Can you check whether you are facing the same problem ? > > Cheers > > On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Jinghong: >> Hadmin variable is not used. You can omit that line. >> >> Which hbase release are you using ? >> >> As Deng said, don't flush per row. >> >> Cheers >> >> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <och...@apache.org> >> wrote: >> >> Hi, >> >> It would be more efficient if you configure the table and flush the >> commits by partition instead of per element in the RDD. The latter works >> fine because you only have 4 elements, but it won't bid well for large data >> sets IMO.. >> >> Thanks, >> Deng >> >> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinho...@gmail.com> wrote: >> >>> >>> Hi, >>> >>> I write my result to hdfs, it did well: >>> >>> val model = >>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >>> TrainFeature())(seqOp, combOp).values >>> model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + >>> a.positiveCount)).saveAsTextFile(modelDataPath); >>> >>> But when I want to write to hbase, the applicaton hung, no log, no >>> response, just stay there, and nothing is written to hbase: >>> >>> val model = >>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >>> TrainFeature())(seqOp, combOp).values.foreach({ res => >>> val configuration = HBaseConfiguration.create(); >>> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >>> configuration.set("hbase.zookeeper.quorum", “192.168.1.66"); >>> configuration.set("hbase.master", "192.168.1:60000"); >>> val hadmin = new HBaseAdmin(configuration); >>> val table = new HTable(configuration, "ljh_test3"); >>> var put = new Put(Bytes.toBytes(res.toKey())); >>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), >>> Bytes.toBytes(res.totalCount + res.positiveCount)); >>> table.put(put); >>> table.flushCommits() >>> }) >>> >>> And then I try to write som simple data to hbase, it did well too: >>> >>> sc.parallelize(Array(1,2,3,4)).foreach({ res => >>> val configuration = HBaseConfiguration.create(); >>> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); >>> configuration.set("hbase.master", "192.168.1:60000"); >>> val hadmin = new HBaseAdmin(configuration); >>> val table = new HTable(configuration, "ljh_test3"); >>> var put = new Put(Bytes.toBytes(res)); >>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res)); >>> table.put(put); >>> table.flushCommits() >>> }) >>> >>> what is the problem with the 2rd code? thanks a lot. >>> >>> > >