Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1407#discussion_r98680344 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java --- @@ -202,40 +246,60 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory ResultSetMetaData rsmd = resultSet.getMetaData(); for (int i = 2; i <= rsmd.getColumnCount(); i++) { String resultColumnName = rsmd.getColumnName(i).toLowerCase(); + String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName); + String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey); + if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) { + // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + resultColumnCurrentMax = statePropertyMap.get(resultColumnName); + } + int type = rsmd.getColumnType(i); + if (isDynamicTableName) { + // We haven't pre-populated the column type map if the table name is dynamic, so do it here + columnTypeMap.put(fullyQualifiedStateKey, type); + } try { - String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName()); + String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName()); if (newMaxValue != null) { - statePropertyMap.put(resultColumnName, newMaxValue); + statePropertyMap.put(fullyQualifiedStateKey, newMaxValue); } } catch (ParseException | IOException pie) { // Fail the whole thing here before we start creating flow files and such throw new ProcessException(pie); } + } } else { // Something is very wrong here, one row (even if count is zero) should be returned throw new SQLException("No rows returned from metadata query: " + selectQuery); } - } catch (SQLException e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - throw new ProcessException(e); - } - final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); - // Generate SQL statements to read "pages" of data - for (int i = 0; i < numberOfFetches; i++) { - FlowFile sqlFlowFile; + // Generate SQL statements to read "pages" of data + for (int i = 0; i < numberOfFetches; i++) { + Integer limit = partitionSize == 0 ? null : partitionSize; + Integer offset = partitionSize == 0 ? null : i * partitionSize; + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); + FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); + session.transfer(sqlFlowFile, REL_SUCCESS); + } + + if (fileToProcess != null) { + session.remove(fileToProcess); + } + } catch (SQLException e) { + if (fileToProcess != null) { + logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess}); + session.transfer(fileToProcess, REL_FAILURE); --- End diff -- agreed, will add the message (not the stack trace) as an attribute.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---