ok here is the code def hbaseQuery:(String)=>RDD[Result] = { val generateRdd = (area:String)=>{ val startRowKey = s"$area${RowKeyUtils.convertToHex(startId, 10)}AAAAAAAAAAAAAAAA" val stopRowKey = s"$area${RowKeyUtils.convertToHex(endId, 10)}GGGGGGGGGGGGGGGG" println(s"startRowKey:${startRowKey}") println(s"stopRowKey :${stopRowKey}")
val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null && !appKey.equals("_")) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), Bytes.toBytes("optKey"), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null && !imei.equals("_")) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), Bytes.toBytes("optImei"), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null && filterList.getFilters.size() > 0) { scan.setFilter(filterList) } scan.setCaching(10000) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, "asrLogFeedBack") hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) => { result } } } return generateRdd } -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: > Could you provide the code of hbaseQuery? It maybe doesn't support to execute > in parallel. > > Best Regards, > Shixiong Zhu > > > > > 2014-11-12 14:32 GMT+08:00 qiaou <qiaou8...@gmail.com > (mailto:qiaou8...@gmail.com)>: > > Hi: > > I got a problem with using the union method of RDD > > things like this > > I get a function like > > def hbaseQuery(area:string):RDD[Result]= ??? > > when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it > > returns 0 > > however when use like this sc.parallize(hbaseQuery('aa’).collect.toList > > ::: hbaseQuery(’bb’).collect.toList).count() it return the right value > > obviously i have got an action after my transformation action ,but why it > > did not work > > fyi > > > > -- > > qiaou > > 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) > > >