Maybe this should be well documented there any dedicated page to
Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <> wrote:

> Great!
> If you want to, you can open a PR that adds
> if (!conn.getAutoCommit()) {
>   conn.setAutoCommit(true);
> }
> to
> Cheers, Fabian
> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <>:
>> Hi Fabian,
>> thanks for the detailed answer. Obviously you are right :)
>> As stated by auto-commit is
>> disabled by default in Phoenix, but it can be easily enabled just appending
>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>> property in the conf object passed to the Phoenix
>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>> i.e.:
>> ----------------------
>> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>> ----------------------
>> Now my job works also with the standard Flink JDBCOutputformat.
>> Just to help other people willing to play with Phoenix and HBase I paste
>> below my simple test job:
>> @Test
>>   public void testPhoenixOutputFormat() throws Exception {
>>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>     senv.enableCheckpointing(5000);
>>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
>> "2,bbb,YYY", "3,ccc,ZZZ");
>>     // Set the target Phoenix table and the columns
>>     DataStream<Row> rows = MapFunction<String, Row>() {
>>       private static final long serialVersionUID = 1L;
>>       @Override
>>       public Row map(String str) throws Exception {
>>         String[] split = str.split(Pattern.quote(","));
>>         Row ret = new Row(3);
>>         ret.setField(0, split[0]);
>>         ret.setField(1, split[1]);
>>         ret.setField(2, split[2]);
>>         return ret;
>>       }
>>     }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>> INFO));
>>     Job job = Job.getInstance(HBaseConfiguration.create(),
>> "phoenix-mr-job");
>>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>>     final org.apache.hadoop.conf.Configuration jobConf =
>> job.getConfiguration();
>>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>     final String upsertStatement = PhoenixConfigurationUtil.getUp
>> sertStatement(jobConf);
>>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.
>> getCanonicalName())
>>         .setDBUrl(connUrl)
>>         .setQuery(upsertStatement)
>>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>> Types.VARCHAR})
>>         .finish());
>>     senv.execute();
>>   }
>> Best,
>> Flavio
>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <> wrote:
>>> Hi,
>>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>>> exception if the connection is in auto commit mode which should be the
>>> default.
>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>>> (or doesn't support it). Can you check that Flavio?
>>> If the Phoenix connector supports but not activates auto commits by
>>> default, we can enable it in
>>> If auto commits are not supported, we can add a check after execute()
>>> and call commit() only if Connection.getAutoCommit() returns false.
>>> Best, Fabian
>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <>:
>>>> Hi to all,
>>>> I'm writing a job that uses Apache Phoenix.
>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>>> it's not well suited to work with Table API because it cannot handle
>>>> generic objects like Rows (it need a DBWritable Object that should be
>>>> already present at compile time). So I've looked into the code of the
>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>>> (basically).
>>>> However, to make it work I had to slightly modify the Flink
>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>>> PreparedStatement. E.g:
>>>>     upload.executeBatch();
>>>>     dbConn.commit();
>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>>> where I've added these 2 lines of code starting from the code of the
>>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>>> are private).
>>>> What do you think about this? Should I open a ticket to add a
>>>> connection commit after executeBatch (in order to be compatible with
>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically
>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the
>>>> visibility of its fields to protected)?
>>>> Best,
>>>> Flavio

Reply via email to