Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1677#discussion_r112848809
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 ---
    @@ -0,0 +1,1058 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RowRecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.stream.IntStream;
    +
    +
    +@EventDriven
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"sql", "record", "jdbc", "put", "database", "update", "insert", 
"delete"})
    +@CapabilityDescription("The PutDatabaseRecord processor uses a specified 
RecordReader to input (possibly multiple) records from an incoming flow file. 
These records are translated to SQL "
    +        + "statements and executed as a single batch. If any errors occur, 
the flow file is routed to failure or retry, and if the records are transmitted 
successfully, the incoming flow file is "
    +        + "routed to success.  The type of statement executed by the 
processor is specified via the Statement Type property, which accepts some 
hard-coded values such as INSERT, UPDATE, and DELETE, "
    +        + "as well as 'Use statement.type Attribute', which causes the 
processor to get the statement type from a flow file attribute.  IMPORTANT: If 
the Statement Type is UPDATE, then the incoming "
    +        + "records must not alter the value(s) of the primary keys (or 
user-specified Update Keys). If such records are encountered, the UPDATE 
statement issued to the database may do nothing "
    +        + "(if no existing records with the new primary key values are 
found), or could inadvertently corrupt the existing data (by changing records 
for which the new values of the primary keys "
    +        + "exist).")
    +@ReadsAttribute(attribute = PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, 
description = "If 'Use statement.type Attribute' is selected for the Statement 
Type property, the value of this attribute "
    +        + "will be used to determine the type of statement (INSERT, 
UPDATE, DELETE, SQL, etc.) to generate and execute.")
    +public class PutDatabaseRecord extends AbstractProcessor {
    +
    +    static final String UPDATE_TYPE = "UPDATE";
    +    static final String INSERT_TYPE = "INSERT";
    +    static final String DELETE_TYPE = "DELETE";
    +    static final String SQL_TYPE = "SQL";   // Not an allowable value in 
the Statement Type property, must be set by attribute
    +    static final String USE_ATTR_TYPE = "Use statement.type Attribute";
    +
    +    static final String STATEMENT_TYPE_ATTRIBUTE = "statement.type";
    +
    +    static final AllowableValue IGNORE_UNMATCHED_FIELD = new 
AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields",
    +            "Any field in the document that cannot be mapped to a column 
in the database is ignored");
    +    static final AllowableValue FAIL_UNMATCHED_FIELD = new 
AllowableValue("Fail on Unmatched Fields", "Fail on Unmatched Fields",
    +            "If the document has any field that cannot be mapped to a 
column in the database, the FlowFile will be routed to the failure 
relationship");
    +    static final AllowableValue IGNORE_UNMATCHED_COLUMN = new 
AllowableValue("Ignore Unmatched Columns",
    +            "Ignore Unmatched Columns",
    +            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  No notification will be logged");
    +    static final AllowableValue WARNING_UNMATCHED_COLUMN = new 
AllowableValue("Warn on Unmatched Columns",
    +            "Warn on Unmatched Columns",
    +            "Any column in the database that does not have a field in the 
document will be assumed to not be required.  A warning will be logged");
    +    static final AllowableValue FAIL_UNMATCHED_COLUMN = new 
AllowableValue("Fail on Unmatched Columns",
    +            "Fail on Unmatched Columns",
    +            "A flow will fail if any column in the database that does not 
have a field in the document.  An error will be logged");
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query 
result set.")
    +            .build();
    +
    +    static final Relationship REL_RETRY = new Relationship.Builder()
    --- End diff --
    
    `REL_RETRY` is not used. Do we still want this relationship?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to