Thomas,
Can you please highlight how the Gson has to be a transient member ? Below
is our operator code.
public class HbaseTableUpdate<T> extends
AbstractHBasePutOutputOperator<Tenant> implements Serializable {
private static final transient Logger logger =
LoggerFactory.getLogger(HbaseTableUpdate.class);
public static final int DEFAULT_BATCH_SIZE = 1000;
private int batchSize = DEFAULT_BATCH_SIZE;
protected int unCommittedSize = 0;
public static final byte[] AUDIT_CF_LOG = "cf".getBytes();
public static final byte[] AUDIT_MESSAGE = "msg".getBytes();
public static final byte[] AUDIT_STATUS = "sts".getBytes();
public static final byte[] AUDIT_RETRY_CNT = "rtc".getBytes();
//Configuration conf = HBaseConfiguration.create();
//AuditLogDAO auditLogDAO = new AuditLogDAO(conf);
Gson gson = new Gson();
//ObjectMapper mapper = new ObjectMapper();
public HbaseTableUpdate() {
store = new HBaseStore();
store.setTableName("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/tenant");
}
@Override
public void processTuple(Tenant tenant) {
HTable table = store.getTable();
Put put = operationPut(tenant);
try {
table.put(put);
if( ++unCommittedSize >= batchSize )
{
table.flushCommits();
unCommittedSize = 0;
}
} catch (RetriesExhaustedWithDetailsException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
} catch (InterruptedIOException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
}
catch (IOException io) {
}
}
@Override
public void endWindow()
{
try
{
if( unCommittedSize > 0 ) {
store.getTable().flushCommits();
unCommittedSize = 0;
}
}
catch (RetriesExhaustedWithDetailsException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
} catch (InterruptedIOException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
}
catch (IOException io) {
}
}
@Override
public Put operationPut(Tenant tenant) {
String rowKey = tenant.getVolumeName() + System.currentTimeMillis();
AuditLog log;
String msgjson = gson.toJson(tenant);
if (StringUtils.isNotEmpty(tenant.getGl())) {
log = new AuditLog(msgjson, Status.VALIDATION_SUCCESS.toString());
} else {
log = new AuditLog(msgjson, Status.VALIDATION_FAILED.toString());
}
Put p = new Put(Bytes.toBytes(rowKey));
p.addColumn(AUDIT_CF_LOG, AUDIT_MESSAGE, Bytes.toBytes(log.getMessage()));
p.addColumn(AUDIT_CF_LOG, AUDIT_STATUS, Bytes.toBytes(log.getStatus()));
p.addColumn(AUDIT_CF_LOG, AUDIT_RETRY_CNT,
Bytes.toBytes(log.getUpdateCount()));
return p;
}
Thanks!!
On Thu, Oct 20, 2016 at 4:53 PM, Thomas Weise <[email protected]> wrote:
> Please make sure that your Gson parser is a transient member of the
> operator.
>
> On Thu, Oct 20, 2016 at 2:33 PM, Bandaru, Srinivas <
> [email protected]> wrote:
>
>> Hi,
>>
>> We are building a an apex (Datatorrent) application to write into Hbase.
>> Getting the below error while launching the application. Is anyone had the
>> similar issue?
>>
>>
>>
>> 2016-10-20 16:05:50,675 INFO org.apache.hadoop.service.AbstractService:
>> Service com.datatorrent.stram.StreamingAppMasterService failed in state
>> INITED; cause: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: 95
>>
>> Serialization trace:
>>
>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>
>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>
>> constructorConstructor (com.google.gson.Gson)
>>
>> gson (com.example.datatorrent.HbaseTableUpdate)
>>
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 95
>>
>> Serialization trace:
>>
>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>
>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>
>> constructorConstructor (com.google.gson.Gson)
>>
>> gson (com.example.datatorrent.HbaseTableUpdate)
>>
>>
>>
>>
>>
>> Thanks,
>>
>> *Srinivas Bandaru*
>>
>>
>> This e-mail, including attachments, may include confidential and/or
>> proprietary information, and may be used only by the person or entity
>> to which it is addressed. If the reader of this e-mail is not the intended
>> recipient or his or her authorized agent, the reader is hereby notified
>> that any dissemination, distribution or copying of this e-mail is
>> prohibited. If you have received this e-mail in error, please notify the
>> sender by replying to this message and delete this e-mail immediately.
>>
>
>