The `conf` object will be sent to other nodes via Broadcast.
Here is the scaladoc of Broadcast:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast
In addition, the object v should not be modified after it is broadcast in
order to ensure that all nodes get the same value of the broadcast variable
(e.g. if the variable is shipped to a new node later).
Best Regards,
Shixiong Zhu
2014-11-12 15:20 GMT+08:00 qiaou qiaou8...@gmail.com:
this work!
but can you explain why should use like this?
--
qiaou
已使用 Sparrow http://www.sparrowmailapp.com/?sig
在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:
You need to create a new configuration for each RDD. Therefore, val
hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val
hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)
Best Regards,
Shixiong Zhu
2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com:
ok here is the code
def hbaseQuery:(String)=RDD[Result] = {
val generateRdd = (area:String)={
val startRowKey = s$area${RowKeyUtils.convertToHex(startId,
10)}
val stopRowKey = s$area${RowKeyUtils.convertToHex(endId,
10)}
println(sstartRowKey:${startRowKey})
println(sstopRowKey :${stopRowKey})
val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(stopRowKey))
val filterList: FilterList = new FilterList()
if (appKey != null !appKey.equals(_)) {
val appKeyFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
filterList.addFilter(appKeyFilter)
}
if (imei != null !imei.equals(_)) {
val imeiFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
filterList.addFilter(imeiFilter)
}
if (filterList.getFilters != null filterList.getFilters.size()
0) {
scan.setFilter(filterList)
}
scan.setCaching(1)
val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
hbaseConf.set(TableInputFormat.SCAN,
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
SparkUtil.getSingleSparkContext()
.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]).map {
case (_: ImmutableBytesWritable, result: Result) = {
result
}
}
}
return generateRdd
}
--
qiaou
已使用 Sparrow http://www.sparrowmailapp.com/?sig
在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.
Best Regards,
Shixiong Zhu
2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:
Hi:
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
returns 0
however when use like this sc.parallize(hbaseQuery('aa’).collect.toList
::: hbaseQuery(’bb’).collect.toList).count() it return the right value
obviously i have got an action after my transformation action ,but why it
did not work
fyi
--
qiaou
已使用 Sparrow http://www.sparrowmailapp.com/?sig