This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new fe95013 NIFI-8046: Fix issue with ResultSetRecordSet on DB2 fe95013 is described below commit fe950131c35756dabd677fb21b436a1f85eabced Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Fri Nov 27 19:23:44 2020 +0100 NIFI-8046: Fix issue with ResultSetRecordSet on DB2 This closes #8046. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../serialization/record/ResultSetRecordSet.java | 22 +++++++++++++--------- .../standard/AbstractQueryDatabaseTable.java | 2 +- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index e6425af..ff0d142 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -58,14 +58,17 @@ public class ResultSetRecordSet implements RecordSet, Closeable { public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { this.rs = rs; - moreRows = rs.next(); - this.schema = createSchema(rs, readerSchema); - - rsColumnNames = new HashSet<>(); - final ResultSetMetaData metadata = rs.getMetaData(); - for (int i = 0; i < metadata.getColumnCount(); i++) { - rsColumnNames.add(metadata.getColumnLabel(i + 1)); + this.rsColumnNames = new HashSet<>(); + RecordSchema tempSchema; + try { + tempSchema = createSchema(rs, readerSchema); + moreRows = rs.next(); + } catch(SQLException se) { + // Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around + moreRows = rs.next(); + tempSchema = createSchema(rs, readerSchema); } + this.schema = tempSchema; } @Override @@ -91,7 +94,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { try { if (moreRows) { final Record record = createRecord(rs); - moreRows = rs.next(); + moreRows = !rs.isClosed() && rs.next(); return record; } else { return null; @@ -146,7 +149,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return value; } - private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { + private RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { final ResultSetMetaData metadata = rs.getMetaData(); final int numCols = metadata.getColumnCount(); final List<RecordField> fields = new ArrayList<>(numCols); @@ -168,6 +171,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final RecordField field = new RecordField(fieldName, dataType, nullable); fields.add(field); + rsColumnNames.add(metadata.getColumnLabel(column)); } return new SimpleRecordSchema(fields); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java index 1df0ae2..2310664 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -316,7 +316,7 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd); sqlWriter.updateCounters(session); - logger.info("{} contains {} records; transferring to 'success'", + logger.debug("{} contains {} records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));