Hi to all,
I'm writing a job that uses Apache Phoenix.

At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
not well suited to work with Table API because it cannot handle generic
objects like Rows (it need a DBWritable Object that should be already
present at compile time). So I've looked into the code of the
PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
(basically).

However, to make it work I had to slightly modify the Flink
JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
PreparedStatement. E.g:

    upload.executeBatch();
    dbConn.commit();

For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
I've added these 2 lines of code starting from the code of the
JDBCOutputformat (it couldn't be extended in this case because all fields
are private).

What do you think about this? Should I open a ticket to add a connection
commit after executeBatch (in order to be compatible with Phoenix) or
something else (e.g. create a Phoenix connector that basically extend
JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
its fields to protected)?

Best,
Flavio

Reply via email to