Great!

If you want to, you can open a PR that adds

if (!conn.getAutoCommit()) {
  conn.setAutoCommit(true);
}

to JdbcOutputFormat.open().

Cheers, Fabian



2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Hi Fabian,
> thanks for the detailed answer. Obviously you are right :)
> As stated by https://phoenix.apache.org/tuning.html auto-commit is
> disabled by default in Phoenix, but it can be easily enabled just appending
> AutoCommit=true to the connection URL or, equivalently, setting the proper
> property in the conf object passed to the Phoenix
> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
> i.e.:
>
> ----------------------
> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
> final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
> ----------------------
>
> Now my job works also with the standard Flink JDBCOutputformat.
> Just to help other people willing to play with Phoenix and HBase I paste
> below my simple test job:
>
> @Test
>   public void testPhoenixOutputFormat() throws Exception {
>
>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>     senv.enableCheckpointing(5000);
>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
> "2,bbb,YYY", "3,ccc,ZZZ");
>
>     // Set the target Phoenix table and the columns
>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {
>
>       private static final long serialVersionUID = 1L;
>
>       @Override
>       public Row map(String str) throws Exception {
>         String[] split = str.split(Pattern.quote(","));
>         Row ret = new Row(3);
>         ret.setField(0, split[0]);
>         ret.setField(1, split[1]);
>         ret.setField(2, split[2]);
>         return ret;
>       }
>     }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));
>
>     Job job = Job.getInstance(HBaseConfiguration.create(),
> "phoenix-mr-job");
>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
> "FIELD_1,FIELD2,FIELD_3");
>     final org.apache.hadoop.conf.Configuration jobConf =
> job.getConfiguration();
>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>     final String upsertStatement = PhoenixConfigurationUtil.
> getUpsertStatement(jobConf);
>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
> Properties(), jobConf);
>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>
>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.
> class.getCanonicalName())
>         .setDBUrl(connUrl)
>         .setQuery(upsertStatement)
>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
> Types.VARCHAR})
>         .finish());
>
>     senv.execute();
>   }
>
> Best,
> Flavio
>
> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>> exception if the connection is in auto commit mode which should be the
>> default.
>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>
>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>> (or doesn't support it). Can you check that Flavio?
>> If the Phoenix connector supports but not activates auto commits by
>> default, we can enable it in JdbcOutputFormat.open().
>> If auto commits are not supported, we can add a check after execute() and
>> call commit() only if Connection.getAutoCommit() returns false.
>>
>> Best, Fabian
>>
>>
>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> Hi to all,
>>> I'm writing a job that uses Apache Phoenix.
>>>
>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>> it's not well suited to work with Table API because it cannot handle
>>> generic objects like Rows (it need a DBWritable Object that should be
>>> already present at compile time). So I've looked into the code of the
>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>> (basically).
>>>
>>> However, to make it work I had to slightly modify the Flink
>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>> PreparedStatement. E.g:
>>>
>>>     upload.executeBatch();
>>>     dbConn.commit();
>>>
>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>> where I've added these 2 lines of code starting from the code of the
>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>> are private).
>>>
>>> What do you think about this? Should I open a ticket to add a connection
>>> commit after executeBatch (in order to be compatible with Phoenix) or
>>> something else (e.g. create a Phoenix connector that basically extend
>>> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
>>> its fields to protected)?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>
>
>

Reply via email to