How did the RDD.union work
Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList) it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
Re: How did the RDD.union work
Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com: Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig
回复: How did the RDD.union work
ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
回复: How did the RDD.union work
this work! but can you explain why should use like this? -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道: You need to create a new configuration for each RDD. Therefore, val hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration) Best Regards, Shixiong Zhu 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com (mailto:qiaou8...@gmail.com): Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
Re: How did the RDD.union work
The `conf` object will be sent to other nodes via Broadcast. Here is the scaladoc of Broadcast: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later). Best Regards, Shixiong Zhu 2014-11-12 15:20 GMT+08:00 qiaou qiaou8...@gmail.com: this work! but can you explain why should use like this? -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道: You need to create a new configuration for each RDD. Therefore, val hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration) Best Regards, Shixiong Zhu 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com: ok here is the code def hbaseQuery:(String)=RDD[Result] = { val generateRdd = (area:String)={ val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 10)} val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 10)} println(sstartRowKey:${startRowKey}) println(sstopRowKey :${stopRowKey}) val scan = new Scan() scan.setStartRow(Bytes.toBytes(startRowKey)) scan.setStopRow(Bytes.toBytes(stopRowKey)) val filterList: FilterList = new FilterList() if (appKey != null !appKey.equals(_)) { val appKeyFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey)) filterList.addFilter(appKeyFilter) } if (imei != null !imei.equals(_)) { val imeiFilter: SingleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(clientInfo), Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei)) filterList.addFilter(imeiFilter) } if (filterList.getFilters != null filterList.getFilters.size() 0) { scan.setFilter(filterList) } scan.setCaching(1) val hbaseConf = HBaseConfigUtil.getHBaseConfiguration hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack) hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) SparkUtil.getSingleSparkContext() .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { case (_: ImmutableBytesWritable, result: Result) = { result } } } return generateRdd } -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道: Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel. Best Regards, Shixiong Zhu 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com: Hi: I got a problem with using the union method of RDD things like this I get a function like def hbaseQuery(area:string):RDD[Result]= ??? when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0 however when use like this sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count() it return the right value obviously i have got an action after my transformation action ,but why it did not work fyi -- qiaou 已使用 Sparrow http://www.sparrowmailapp.com/?sig