Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1941#discussion_r62832479
--- Diff:
flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
---
@@ -81,25 +134,51 @@ public void configure(Configuration parameters) {
* @throws IOException
*/
@Override
- public void open(InputSplit ignored) throws IOException {
+ public void open(InputSplit inputSplit) throws IOException {
+ hasNext = true;
try {
- establishConnection();
- statement = dbConn.createStatement(resultSetType,
resultSetConcurrency);
- resultSet = statement.executeQuery(query);
+ if (inputSplit != null && parameterValues != null) {
+ for (int i = 0; i <
parameterValues[inputSplit.getSplitNumber()].length; i++) {
+ Object param =
parameterValues[inputSplit.getSplitNumber()][i];
+ if (param instanceof String) {
+ statement.setString(i + 1,
(String) param);
+ } else if (param instanceof Long) {
+ statement.setLong(i + 1, (Long)
param);
+ } else if (param instanceof Integer) {
+ statement.setInt(i + 1,
(Integer) param);
+ } else if (param instanceof Double) {
+ statement.setDouble(i + 1,
(Double) param);
+ } else if (param instanceof Boolean) {
+ statement.setBoolean(i + 1,
(Boolean) param);
+ } else if (param instanceof Float) {
+ statement.setFloat(i + 1,
(Float) param);
+ } else if (param instanceof BigDecimal)
{
+ statement.setBigDecimal(i + 1,
(BigDecimal) param);
+ } else if (param instanceof Byte) {
+ statement.setByte(i + 1, (Byte)
param);
+ } else if (param instanceof Short) {
+ statement.setShort(i + 1,
(Short) param);
+ } else if (param instanceof Date) {
+ statement.setDate(i + 1, (Date)
param);
+ } else if (param instanceof Time) {
+ statement.setTime(i + 1, (Time)
param);
+ } else if (param instanceof Timestamp) {
+ statement.setTimestamp(i + 1,
(Timestamp) param);
+ } else if (param instanceof Array) {
+ statement.setArray(i + 1,
(Array) param);
+ } else {
+ //extends with other types if
needed
+ throw new
IllegalArgumentException("open() failed. Parameter " + i + " of type " +
param.getClass() + " is not handled (yet)." );
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Executing '%s'
with parameters %s", queryTemplate,
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+ }
+ }
+ resultSet = statement.executeQuery();
} catch (SQLException se) {
close();
--- End diff --
No need to call `close()`. This should be done by the object that manages
the IF's life cycle.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---