Re: Task Serialization Error on DataFrame.foreachPartition

2015-06-22 Thread Ted Yu
private HTable table;

You should declare table variable within apply() method.

BTW which hbase release are you using ?

I see you implement caching yourself. You can make use of the following
HTable method:

  public void setWriteBufferSize(long writeBufferSize) throws IOException {

Cheers

On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel nishant.k.pa...@gmail.com
wrote:

 Hi,

 Please find code as below.

 dataFrame
 .foreachPartition(new
 AbstractFunction1scala.collection.IteratorRow, BoxedUnit() {

 private HTable table;

 private char ROWKEY_SEPERATOR = '\u';

 public BoxedUnit apply(scala.collection.IteratorRow
 rows) {

 Configuration config = HBaseConfiguration.create();

 config.set(
 hbase.zookeeper.quorum,
 );
 config.set(hbase.zookeeper.property.clientPort,
 ???);
 config.set(zookeeper.znode.parent, );

 try {
 table = new HTable(config, table_name);
 } catch (Exception e) {
 throw new RuntimeException(e);
 }

 ListPut puts = new ArrayListPut();
 try {
 while (rows.hasNext()) {
 Row row = rows.next();
 MapString, Object map = new
 HashMapString, Object();
 String[] fieldNames =
 row.schema().fieldNames();
 for (int i = 0; i  fieldNames.length;
 i++) {
 map.put(fieldNames[i].toUpperCase(),
 row.get(i));
 }
 puts.add(mapToPut(map));
 if (puts.size() = 500) {
 table.put(puts);
 puts.clear();
 }

 }
 table.put(puts);
 } catch (Exception e) {
 throw new RuntimeException(e);
 }
 return BoxedUnit.UNIT;
 }

 private Put mapToPut(MapString, Object map) throws IOException {
 try {
 Put put = new Put(getRowKey(map));
 String value = null;
 for (String key : map.keySet()) {
 value = (map.get(key) == null ?  :
 map.get(key).toString());
 put.add(Bytes.toBytes(0),
 Bytes.toBytes(key),
 Bytes.toBytes(value));
 }
 return put;
 } catch (Exception e) {
 e.printStackTrace();
 throw e;
 }
 }

 private byte[] getRowKey(MapString, Object map) {

 StringBuilder builder = new StringBuilder();
 return Bytes.toBytes(builder.toString());
 }

 });

 Regards,
 Nishant

 On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us the code for loading Hive into hbase ?

 There shouldn't be 'return' statement in that code.

 Cheers



 On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com
 wrote:

 Hi,

 I am loading data from Hive table to Hbase after doing some manipulation.

 I am getting error as 'Task not Serializable'.

 My code is as below.

 public class HiveToHbaseLoader implements Serializable {

 public static void main(String[] args) throws Exception {

 String hbaseTableName = args[0];
 String hiveQuery = args[1];

 SparkConf conf = new SparkConf().setAppName(Hive to Hbase
 Loader)
 .setMaster();
 JavaSparkContext sc = new JavaSparkContext(conf);

 HiveContext hiveContext = new HiveContext(sc.sc());

 hiveContext.setConf(hive.metastore.uris,
 ?);

 DataFrame dataFrame = hiveContext.sql(hiveQuery);

 dataFrame
 .foreachPartition(new
 AbstractFunction1scala.collection.IteratorRow, BoxedUnit() {

 //Logic to load row from hive to Hbase.

 }
 }}


 Getting error as below.


 Exception in thread main org.apache.spark.SparkException: Task not
 serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 

Re: Task Serialization Error on DataFrame.foreachPartition

2015-06-21 Thread Ted Yu
Can you show us the code for loading Hive into hbase ?

There shouldn't be 'return' statement in that code. 

Cheers



 On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com wrote:
 
 Hi,
 
 I am loading data from Hive table to Hbase after doing some manipulation.
 
 I am getting error as 'Task not Serializable'.
 
 My code is as below.
 
 public class HiveToHbaseLoader implements Serializable {
 
 public static void main(String[] args) throws Exception {
 
 String hbaseTableName = args[0];
 String hiveQuery = args[1];
 
 SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader)
 .setMaster();
 JavaSparkContext sc = new JavaSparkContext(conf);
 
 HiveContext hiveContext = new HiveContext(sc.sc());
 
 hiveContext.setConf(hive.metastore.uris,
 ?);
 
 DataFrame dataFrame = hiveContext.sql(hiveQuery);
 
 dataFrame
 .foreachPartition(new 
 AbstractFunction1scala.collection.IteratorRow, BoxedUnit() {
 
 //Logic to load row from hive to Hbase.
 
 }
 }}
 
 
 Getting error as below.
 
 
 Exception in thread main org.apache.spark.SparkException: Task not 
 serializable
 at 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
 at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
 at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:875)
 at com.philips.bda.HiveToHbaseLoader.main(HiveToHbaseLoader.java:46)
 Caused by: java.io.NotSerializableException: 
 com.philips.bda.HiveToHbaseLoader$1
 Serialization stack:
 - object not serializable (class: com.philips.bda.HiveToHbaseLoader$1, 
 value: function1)
 at 
 org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
 at 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
 at 
 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
 at 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 ... 5 more
 
 
 -- 
 Regards,
 Nishant