Thomas,

Can you please highlight how the Gson has to be a transient member ? Below
is our operator code.

public class HbaseTableUpdate<T> extends
AbstractHBasePutOutputOperator<Tenant> implements Serializable {
 private static final transient Logger logger =
LoggerFactory.getLogger(HbaseTableUpdate.class);
 public static final int DEFAULT_BATCH_SIZE = 1000;
 private int batchSize = DEFAULT_BATCH_SIZE;
 protected int unCommittedSize = 0;
 public static final byte[] AUDIT_CF_LOG = "cf".getBytes();
 public static final byte[] AUDIT_MESSAGE = "msg".getBytes();
 public static final byte[] AUDIT_STATUS = "sts".getBytes();
 public static final byte[] AUDIT_RETRY_CNT = "rtc".getBytes();
 //Configuration conf = HBaseConfiguration.create();
 //AuditLogDAO auditLogDAO = new AuditLogDAO(conf);
 Gson gson = new Gson();
 //ObjectMapper mapper = new ObjectMapper();

 public HbaseTableUpdate() {
     store = new HBaseStore();
     
store.setTableName("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/tenant");
 }



 @Override
 public void processTuple(Tenant tenant) {
     HTable table = store.getTable();
     Put put = operationPut(tenant);
     try {
         table.put(put);
         if( ++unCommittedSize >= batchSize )
         {
             table.flushCommits();
             unCommittedSize = 0;
         }
     } catch (RetriesExhaustedWithDetailsException e) {
         logger.error("Could not output tuple", e);
         DTThrowable.rethrow(e);
     } catch (InterruptedIOException e) {
         logger.error("Could not output tuple", e);
         DTThrowable.rethrow(e);
     }
     catch (IOException io) {

     }

 }

 @Override
 public void endWindow()
 {
     try
     {
         if( unCommittedSize > 0 ) {
             store.getTable().flushCommits();
             unCommittedSize = 0;
         }
     }
     catch (RetriesExhaustedWithDetailsException e) {
         logger.error("Could not output tuple", e);
         DTThrowable.rethrow(e);
     } catch (InterruptedIOException e) {
         logger.error("Could not output tuple", e);
         DTThrowable.rethrow(e);
     }
     catch (IOException io) {

     }
 }

 @Override
 public Put operationPut(Tenant tenant) {
     String rowKey = tenant.getVolumeName() + System.currentTimeMillis();
     AuditLog log;
         String msgjson = gson.toJson(tenant);

         if (StringUtils.isNotEmpty(tenant.getGl())) {
             log = new AuditLog(msgjson, Status.VALIDATION_SUCCESS.toString());
         } else {
             log = new AuditLog(msgjson, Status.VALIDATION_FAILED.toString());
         }

     Put p = new Put(Bytes.toBytes(rowKey));
     p.addColumn(AUDIT_CF_LOG, AUDIT_MESSAGE, Bytes.toBytes(log.getMessage()));
     p.addColumn(AUDIT_CF_LOG, AUDIT_STATUS, Bytes.toBytes(log.getStatus()));
     p.addColumn(AUDIT_CF_LOG, AUDIT_RETRY_CNT,
Bytes.toBytes(log.getUpdateCount()));
     return p;
 }



Thanks!!



On Thu, Oct 20, 2016 at 4:53 PM, Thomas Weise <[email protected]> wrote:

> Please make sure that your Gson parser is a transient member of the
> operator.
>
> On Thu, Oct 20, 2016 at 2:33 PM, Bandaru, Srinivas <
> [email protected]> wrote:
>
>> Hi,
>>
>> We are building a an apex (Datatorrent) application to write into Hbase.
>> Getting the below error while launching the application. Is anyone had the
>> similar issue?
>>
>>
>>
>> 2016-10-20 16:05:50,675 INFO org.apache.hadoop.service.AbstractService:
>> Service com.datatorrent.stram.StreamingAppMasterService failed in state
>> INITED; cause: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: 95
>>
>> Serialization trace:
>>
>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>
>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>
>> constructorConstructor (com.google.gson.Gson)
>>
>> gson (com.example.datatorrent.HbaseTableUpdate)
>>
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 95
>>
>> Serialization trace:
>>
>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>
>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>
>> constructorConstructor (com.google.gson.Gson)
>>
>> gson (com.example.datatorrent.HbaseTableUpdate)
>>
>>
>>
>>
>>
>> Thanks,
>>
>> *Srinivas Bandaru*
>>
>>
>> This e-mail, including attachments, may include confidential and/or
>> proprietary information, and may be used only by the person or entity
>> to which it is addressed. If the reader of this e-mail is not the intended
>> recipient or his or her authorized agent, the reader is hereby notified
>> that any dissemination, distribution or copying of this e-mail is
>> prohibited. If you have received this e-mail in error, please notify the
>> sender by replying to this message and delete this e-mail immediately.
>>
>
>

Reply via email to