I write a demo, but still no response, no error, no log. My hbase is 0.98, hadoop 2.3, spark 1.4.
And I run in yarn-client mode. any idea? thanks. package com.lujinhong.sparkdemo import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object SparkConnectHbase2 extends Serializable { def main(args: Array[String]) { new SparkConnectHbase2().toHbase(); } } class SparkConnectHbase2 extends Serializable { def toHbase() { val conf = new SparkConf().setAppName("ljh_ml3"); val sc = new SparkContext(conf) val tmp = sc.parallelize(Array(601, 701, 801, 901)).foreachPartition({ a => val configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); configuration.set("hbase.zookeeper.quorum", “192.168.1.66"); configuration.set("hbase.master", “192.168.1.66:60000"); val table = new HTable(configuration, "ljh_test4"); var put = new Put(Bytes.toBytes(a+"")); put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(a + "value")); table.put(put); table.flushCommits(); }) } } > 在 2015年10月28日,10:23,Fengdong Yu <fengdo...@everstring.com> 写道: > > Also, please remove the HBase related to the Scala Object, this will resolve > the serialize issue and avoid open connection repeatedly. > > and remember close the table after the final flush. > > > >> On Oct 28, 2015, at 10:13 AM, Ted Yu <yuzhih...@gmail.com >> <mailto:yuzhih...@gmail.com>> wrote: >> >> For #2, have you checked task log(s) to see if there was some clue ? >> >> You may want to use foreachPartition to reduce the number of flushes. >> >> In the future, please remove color coding - it is not easy to read. >> >> Cheers >> >> On Tue, Oct 27, 2015 at 6:53 PM, jinhong lu <lujinho...@gmail.com >> <mailto:lujinho...@gmail.com>> wrote: >> Hi, Ted >> >> thanks for your help. >> >> I check the jar, it is in classpath, and now the problem is : >> >> 1、 Follow codes runs good, and it put the result to hbse: >> >> val res = >> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >> TrainFeature())(seqOp, combOp).values.first() >> val configuration = HBaseConfiguration.create(); >> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >> configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); >> configuration.set("hbase.master", "192.168.1.66:60000 >> <http://192.168.1.66:60000/>"); >> val table = new HTable(configuration, "ljh_test3"); >> var put = new Put(Bytes.toBytes(res.toKey())); >> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), >> Bytes.toBytes(res.positiveCount)); >> table.put(put); >> table.flushCommits() >> >> 2、But if I change the first() function to foreach: >> >> >> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >> TrainFeature())(seqOp, combOp).values.foreach({res=> >> val configuration = HBaseConfiguration.create(); >> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >> configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); >> configuration.set("hbase.master", "192.168.1.66:60000 >> <http://192.168.1.66:60000/>"); >> val table = new HTable(configuration, "ljh_test3"); >> var put = new Put(Bytes.toBytes(res.toKey())); >> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), >> Bytes.toBytes(res.positiveCount)); >> table.put(put); >> >> }) >> >> the application hung, and the last log is : >> >> 15/10/28 09:30:33 INFO DAGScheduler: Missing parents for ResultStage 2: >> List() >> 15/10/28 09:30:33 INFO DAGScheduler: Submitting ResultStage 2 >> (MapPartitionsRDD[6] at values at TrainModel3.scala:98), which is now >> runnable >> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(7032) called with >> curMem=264045, maxMem=278302556 >> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3 stored as values in >> memory (estimated size 6.9 KB, free 265.2 MB) >> 15/10/28 09:30:33 INFO MemoryStore: ensureFreeSpace(3469) called with >> curMem=271077, maxMem=278302556 >> 15/10/28 09:30:33 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes >> in memory (estimated size 3.4 KB, free 265.1 MB) >> 15/10/28 09:30:33 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory >> on 10.120.69.53:43019 (size: 3.4 KB, free: 265.4 MB) >> 15/10/28 09:30:33 INFO SparkContext: Created broadcast 3 from broadcast at >> DAGScheduler.scala:874 >> 15/10/28 09:30:33 INFO DAGScheduler: Submitting 1 missing tasks from >> ResultStage 2 (MapPartitionsRDD[6] at values at TrainModel3.scala:98) >> 15/10/28 09:30:33 INFO YarnScheduler: Adding task set 2.0 with 1 tasks >> 15/10/28 09:30:33 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID >> 2, gdc-dn147-formal.i.nease.net <http://gdc-dn147-formal.i.nease.net/>, >> PROCESS_LOCAL, 1716 bytes) >> 15/10/28 09:30:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory >> on gdc-dn147-formal.i.nease.net:59814 (size: 3.4 KB, free: 1060.3 MB) >> 15/10/28 09:30:34 INFO MapOutputTrackerMasterEndpoint: Asked to send map >> output locations for shuffle 0 to gdc-dn147-formal.i.nease.net:52904 >> 15/10/28 09:30:34 INFO MapOutputTrackerMaster: Size of output statuses for >> shuffle 0 is 154 bytes >> >> 3、besides, I take the configuration and HTable out of foreach: >> >> val configuration = HBaseConfiguration.create(); >> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >> configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); >> configuration.set("hbase.master", "192.168.1.66:60000"); >> val table = new HTable(configuration, "ljh_test3"); >> >> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >> TrainFeature())(seqOp, combOp).values.foreach({ res => >> >> var put = new Put(Bytes.toBytes(res.toKey())); >> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), >> Bytes.toBytes(res.positiveCount)); >> table.put(put); >> >> }) >> table.flushCommits() >> >> found serializable problem: >> >> Exception in thread "main" org.apache.spark.SparkException: Task not >> serializable >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) >> at >> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) >> at org.apache.spark.rdd.RDD >> $$anonfun$foreach$1.apply(RDD.scala:869) >> at >> org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) >> at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) >> at com.chencai.spark.ml.TrainModel3$.train(TrainModel3.scala:100) >> at com.chencai.spark.ml.TrainModel3$.main(TrainModel3.scala:115) >> at com.chencai.spark.ml.TrainModel3.main(TrainModel3.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> Caused by: java.io.NotSerializableException: >> org.apache.hadoop.conf.Configuration >> Serialization stack: >> - object not serializable (class: >> org.apache.hadoop.conf.Configuration, value: Configuration: >> core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, >> yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, >> hbase-default.xml, hbase-site.xml) >> - field (class: com.chencai.spark.ml.TrainModel3$$anonfun$train$5, >> name: configuration$1, type: class org.apache.hadoop.conf.Configuration) >> - object (class >> com.chencai.spark.ml.TrainModel3$$anonfun$train$5, <function1>) >> at >> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) >> ... 21 more >> >> >> >> >>> 在 2015年10月28日,09:26,Ted Yu <yuzhih...@gmail.com >>> <mailto:yuzhih...@gmail.com>> 写道: >>> >>> Jinghong: >>> In one of earlier threads on storing data to hbase, it was found that >>> htrace jar was not on classpath, leading to write failure. >>> >>> Can you check whether you are facing the same problem ? >>> >>> Cheers >>> >>> On Tue, Oct 27, 2015 at 5:11 AM, Ted Yu <yuzhih...@gmail.com >>> <mailto:yuzhih...@gmail.com>> wrote: >>> Jinghong: >>> Hadmin variable is not used. You can omit that line. >>> >>> Which hbase release are you using ? >>> >>> As Deng said, don't flush per row. >>> >>> Cheers >>> >>> On Oct 27, 2015, at 3:21 AM, Deng Ching-Mallete <och...@apache.org >>> <mailto:och...@apache.org>> wrote: >>> >>>> Hi, >>>> >>>> It would be more efficient if you configure the table and flush the >>>> commits by partition instead of per element in the RDD. The latter works >>>> fine because you only have 4 elements, but it won't bid well for large >>>> data sets IMO.. >>>> >>>> Thanks, >>>> Deng >>>> >>>> On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinho...@gmail.com >>>> <mailto:lujinho...@gmail.com>> wrote: >>>> >>>> >>>> Hi, >>>> >>>> I write my result to hdfs, it did well: >>>> >>>> val model = >>>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >>>> TrainFeature())(seqOp, combOp).values >>>> model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + >>>> a.positiveCount)).saveAsTextFile(modelDataPath); >>>> >>>> But when I want to write to hbase, the applicaton hung, no log, no >>>> response, just stay there, and nothing is written to hbase: >>>> >>>> val model = >>>> lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new >>>> TrainFeature())(seqOp, combOp).values.foreach({ res => >>>> val configuration = HBaseConfiguration.create(); >>>> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >>>> configuration.set("hbase.zookeeper.quorum", “192.168.1.66"); >>>> configuration.set("hbase.master", "192.168.1:60000"); >>>> val hadmin = new HBaseAdmin(configuration); >>>> val table = new HTable(configuration, "ljh_test3"); >>>> var put = new Put(Bytes.toBytes(res.toKey())); >>>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), >>>> Bytes.toBytes(res.totalCount + res.positiveCount)); >>>> table.put(put); >>>> table.flushCommits() >>>> }) >>>> >>>> And then I try to write som simple data to hbase, it did well too: >>>> >>>> sc.parallelize(Array(1,2,3,4)).foreach({ res => >>>> val configuration = HBaseConfiguration.create(); >>>> configuration.set("hbase.zookeeper.property.clientPort", "2181"); >>>> configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); >>>> configuration.set("hbase.master", "192.168.1:60000"); >>>> val hadmin = new HBaseAdmin(configuration); >>>> val table = new HTable(configuration, "ljh_test3"); >>>> var put = new Put(Bytes.toBytes(res)); >>>> put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res)); >>>> table.put(put); >>>> table.flushCommits() >>>> }) >>>> >>>> what is the problem with the 2rd code? thanks a lot. >>>> >>>> >>> >> >> >