Hello everyone,

I solved my issue by using an Array[Byte] as a parameter, instead of the
explicit HTableDescriptor parameter. This way I can instantiate the
TableDescriptor inside the open method of OutputFormat using the static
method HTableDescriptor.parseFrom. In the end, marking conf, table and
connection as transient wouldn't make any difference.


2017-08-27 14:22 GMT+02:00 Federico D'Ambrosio <

> Hi,
> could you elaborate, please? Marking conf, connection and table as
> transient wouldn't help because of the presence of the HTableDescriptor
> reference?
> 2017-08-27 12:44 GMT+02:00 Jörn Franke <jornfra...@gmail.com>:
>> It looks like that in your case everything should be serializable. An
>> alternative would be to mark certain non-serializable things as transient,
>> but as far as I see this is not possible in your case.
>> On 27. Aug 2017, at 11:02, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>> Hi,
>> I'm trying to write on HBase using writeOutputFormat using a custom HBase
>> format inspired from this example
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java>
>> in flink-hbase (mind you, I'm using Scala instead of Java) and encountering
>> the error reported in the mail object.
>> Now, the OutputFormat I'm using is the following:
>> abstract class HBaseOutputFormat[T](tableDescriptor: HTableDescriptor, 
>> confPath : Path) extends OutputFormat[T]{
>>   private val LOG = LoggerFactory.getLogger(this.getClass)
>>   var conf : org.apache.hadoop.conf.Configuration = _
>>   var connection : Connection = _
>>   var table : Table = _
>>   var taskNumber : String = _
>>   @throws[IOException]
>>   def configure(parameters: Configuration): Unit = {
>>     conf = HBaseConfiguration.create()
>>     conf.addResource(confPath.getPath)
>>     connection = ConnectionFactory.createConnection(conf)
>>   }
>>   @throws[IOException]
>>   def close(): Unit = {
>>     table.close()
>>   }
>>   @throws[IOException]
>>   def open(taskNumber: Int, numTasks: Int): Unit = {
>>     this.taskNumber = String.valueOf(taskNumber)
>>     val admin = connection.getAdmin
>>     if(!admin.tableExists(tableDescriptor.getTableName))
>>       admin.createTable(tableDescriptor)
>>     table = connection.getTable(tableDescriptor.getTableName)
>>   }
>> }
>> which is inherited by the actual format used, that implements the 
>> writeRecord method
>> class HBaseBatchFormat(tableDescriptor: HTableDescriptor, confPath : Path)
>>   extends HBaseOutputFormat[BatchContainer](tableDescriptor, confPath)
>> with BatchContainer being
>> case class BatchContainer(batch: Iterable[(String, String, String, Int)]) 
>> extends Serializable
>> I'd like to ask you: what needs to be Serializable? As far as I see,
>> conf, connection and table are not Serializable and so they are surely part
>> of the issue. Are the constructor parameters, especially tableDescriptor
>> which is not Serializable, to be considered in this case? Should all the
>> methods implemented from the OutputFormat interface contain only
>> Serializable variables?
>> Thank you for you attention,
>> Federico

Reply via email to