[ https://issues.apache.org/jira/browse/NIFI-1575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196179#comment-15196179 ]
ASF GitHub Bot commented on NIFI-1575: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/261#discussion_r56234935 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java --- @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." + + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'querydbtable.row.count' indicates how many rows were selected.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + + "to fetch only those records that have max values greater than the retained values. This can be used for " + + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + + "per the State Management documentation") +@WritesAttribute(attribute = "querydbtable.row.count") +public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { + + public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + + public static final String SQL_PREPROCESS_STRATEGY_NONE = "None"; + public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship.") + .build(); + private final Set<Relationship> relationships; + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain a connection to the database.") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the database table to be queried.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder() + .name("Columns to Return") + .description("A comma-separated list of column names to be used in the query. If your database requires " + + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " + + "column names are supplied, all columns in the specified table will be returned.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder() + .name("Maximum-value Columns") + .description("A comma-separated list of column names. The processor will keep track of the maximum value " + + "for each column that has been returned since the processor started running. This can be used to " + + "retrieve only those rows that have been added/updated since the last retrieval. Note that some " + + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + + "types should not be listed in this property, and will result in error(s) during processing.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + ", zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new PropertyDescriptor.Builder() + .name("SQL Pre-processing Strategy") + .description("The strategy to employ when generating the SQL for querying the table. A strategy may include " + + "custom or database-specific code, such as the treatment of time/date formats.") + .required(true) + .allowableValues(SQL_PREPROCESS_STRATEGY_NONE, SQL_PREPROCESS_STRATEGY_ORACLE) + .defaultValue("None") + .build(); + + + private final List<PropertyDescriptor> propDescriptors; + + protected final Map<String, Integer> columnTypeMap = new HashMap<>(); + + public QueryDatabaseTable() { + final Set<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + r.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(r); + + final List<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(TABLE_NAME); + pds.add(COLUMN_NAMES); + pds.add(MAX_VALUE_COLUMN_NAMES); + pds.add(QUERY_TIMEOUT); + pds.add(SQL_PREPROCESS_STRATEGY); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @OnScheduled + public void setup(final ProcessContext context) { + // Try to fill the columnTypeMap with the types of the desired max-value columns + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible + // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read + // approach as in Apache Drill + String query = getSelectFromClause(tableName, maxValueColumnNames).append(" WHERE 1 = 0").toString(); + ResultSet resultSet = st.executeQuery(query); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + int numCols = resultSetMetaData.getColumnCount(); + if (numCols > 0) { + columnTypeMap.clear(); + for (int i = 1; i <= numCols; i++) { + String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + int colType = resultSetMetaData.getColumnType(i); + columnTypeMap.put(colName, colType); + } + + } else { + throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); + } + + } catch (SQLException e) { + throw new ProcessException("Unable to communicate with database in order to determine column types", e); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession session = sessionFactory.createSession(); + FlowFile fileToProcess = null; + + final ProcessorLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue(); + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + context.yield(); + return; + } + // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually + // set as the current state map (after the session has been committed) + final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); + + final String selectQuery = getQuery(tableName, columnNames, getColumns(maxValueColumnNames), stateMap, preProcessStrategy); + final StopWatch stopWatch = new StopWatch(true); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + st.setQueryTimeout(queryTimeout); // timeout in seconds + + final LongHolder nrOfRows = new LongHolder(0L); + + fileToProcess = session.create(); + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + final ResultSet resultSet = st.executeQuery(selectQuery); + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector)); + + } catch (final SQLException e) { + throw new ProcessException("Error during database query or conversion of records to Avro", e); + } + } + }); + + if (nrOfRows.get() > 0) { + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString()); + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(fileToProcess, REL_SUCCESS); + } else { + // If there were no rows returned, don't send the flowfile + session.remove(fileToProcess); + context.yield(); + } + + } catch (final Throwable /*ProcessException | SQLException*/ e) { --- End diff -- D'oh! I did that while debugging and forgot to change it back :( > Add ability to query DB table using "last value" from column > ------------------------------------------------------------ > > Key: NIFI-1575 > URL: https://issues.apache.org/jira/browse/NIFI-1575 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > Fix For: 0.6.0 > > > It would be a useful feature for a processor to be able to query from a > database table for only those records that have are added/available since the > last time the query was executed. For example, if the processor could keep > the max value of a timestamp column for the last result set returned, the > next time the query was issued, it could use that timestamp value and column > to only return records whose timestamps were greater than "the last time". > This shouldn't be limited to timestamps of course; it would be useful for any > strictly-increasing value (like primary key ID) but would be up to the user > to select the table and column. The processor would simply keep the state for > the specified column's max value. > Proposed is a "QueryDBTable" processor which would have a properties to > specify the table name and the column in the table for which to keep state > information about the last maximum value retrieved. Subsequent queries of the > table use this value to filter for rows whose value for the specified column > are greater than the "last maximum value". Upon each successful query, the > "last maximum value" is updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)