private HTable table;
You should declare table variable within apply() method.
BTW which hbase release are you using ?
I see you implement caching yourself. You can make use of the following
HTable method:
public void setWriteBufferSize(long writeBufferSize) throws IOException {
Cheers
On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel nishant.k.pa...@gmail.com
wrote:
Hi,
Please find code as below.
dataFrame
.foreachPartition(new
AbstractFunction1scala.collection.IteratorRow, BoxedUnit() {
private HTable table;
private char ROWKEY_SEPERATOR = '\u';
public BoxedUnit apply(scala.collection.IteratorRow
rows) {
Configuration config = HBaseConfiguration.create();
config.set(
hbase.zookeeper.quorum,
);
config.set(hbase.zookeeper.property.clientPort,
???);
config.set(zookeeper.znode.parent, );
try {
table = new HTable(config, table_name);
} catch (Exception e) {
throw new RuntimeException(e);
}
ListPut puts = new ArrayListPut();
try {
while (rows.hasNext()) {
Row row = rows.next();
MapString, Object map = new
HashMapString, Object();
String[] fieldNames =
row.schema().fieldNames();
for (int i = 0; i fieldNames.length;
i++) {
map.put(fieldNames[i].toUpperCase(),
row.get(i));
}
puts.add(mapToPut(map));
if (puts.size() = 500) {
table.put(puts);
puts.clear();
}
}
table.put(puts);
} catch (Exception e) {
throw new RuntimeException(e);
}
return BoxedUnit.UNIT;
}
private Put mapToPut(MapString, Object map) throws IOException {
try {
Put put = new Put(getRowKey(map));
String value = null;
for (String key : map.keySet()) {
value = (map.get(key) == null ? :
map.get(key).toString());
put.add(Bytes.toBytes(0),
Bytes.toBytes(key),
Bytes.toBytes(value));
}
return put;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
private byte[] getRowKey(MapString, Object map) {
StringBuilder builder = new StringBuilder();
return Bytes.toBytes(builder.toString());
}
});
Regards,
Nishant
On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu yuzhih...@gmail.com wrote:
Can you show us the code for loading Hive into hbase ?
There shouldn't be 'return' statement in that code.
Cheers
On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com
wrote:
Hi,
I am loading data from Hive table to Hbase after doing some manipulation.
I am getting error as 'Task not Serializable'.
My code is as below.
public class HiveToHbaseLoader implements Serializable {
public static void main(String[] args) throws Exception {
String hbaseTableName = args[0];
String hiveQuery = args[1];
SparkConf conf = new SparkConf().setAppName(Hive to Hbase
Loader)
.setMaster();
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
hiveContext.setConf(hive.metastore.uris,
?);
DataFrame dataFrame = hiveContext.sql(hiveQuery);
dataFrame
.foreachPartition(new
AbstractFunction1scala.collection.IteratorRow, BoxedUnit() {
//Logic to load row from hive to Hbase.
}
}}
Getting error as below.
Exception in thread main org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at