[FLINK-3292]Fix for Bug in flink-jdbc. Not all JDBC drivers supported This closes #1551
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d97fcda6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d97fcda6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d97fcda6 Branch: refs/heads/master Commit: d97fcda6635b6821ac3f61c39e0fa156bc7c7fd4 Parents: 83b88c2 Author: Subhobrata Dey <sbc...@gmail.com> Authored: Wed Jan 27 17:00:37 2016 -0500 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Jan 28 14:37:53 2016 +0100 ---------------------------------------------------------------------- .../flink/api/java/io/jdbc/JDBCInputFormat.java | 16 +++++++++++++++- .../flink/api/java/io/jdbc/JDBCInputFormatTest.java | 3 +++ .../api/java/io/jdbc/JDBCOutputFormatTest.java | 3 +++ 3 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index eb3ac31..84eb309 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -58,6 +58,8 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp private String drivername; private String dbURL; private String query; + private int resultSetType; + private int resultSetConcurrency; private transient Connection dbConn; private transient Statement statement; @@ -82,7 +84,7 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp public void open(InputSplit ignored) throws IOException { try { establishConnection(); - statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); + statement = dbConn.createStatement(resultSetType, resultSetConcurrency); resultSet = statement.executeQuery(query); } catch (SQLException se) { close(); @@ -308,6 +310,8 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp public JDBCInputFormatBuilder() { this.format = new JDBCInputFormat(); + this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY; + this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; } public JDBCInputFormatBuilder setUsername(String username) { @@ -335,6 +339,16 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp return this; } + public JDBCInputFormatBuilder setResultSetType(int resultSetType) { + format.resultSetType = resultSetType; + return this; + } + + public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) { + format.resultSetConcurrency = resultSetConcurrency; + return this; + } + public JDBCInputFormat finish() { if (format.username == null) { LOG.info("Username was not supplied separately."); http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index b76f8b8..b1d43df 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -23,6 +23,8 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.ResultSet; + import org.junit.Assert; @@ -172,6 +174,7 @@ public class JDBCInputFormatTest { .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:ebookshop") .setQuery("select * from books") + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); jdbcInputFormat.open(null); Tuple5 tuple = new Tuple5(); http://git-wip-us.apache.org/repos/asf/flink/blob/d97fcda6/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java index 7d004f9..276518b 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -23,6 +23,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.ResultSet; import org.junit.Assert; @@ -199,6 +200,7 @@ public class JDBCOutputFormatTest { .setDrivername(driverPath) .setDBUrl(dbUrl) .setQuery("select * from " + sourceTable) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); jdbcInputFormat.open(null); @@ -215,6 +217,7 @@ public class JDBCOutputFormatTest { .setDrivername(driverPath) .setDBUrl(dbUrl) .setQuery("select * from " + targetTable) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); jdbcInputFormat.open(null);