[ 
https://issues.apache.org/jira/browse/NIFI-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613898#comment-16613898
 ] 

ASF GitHub Bot commented on NIFI-4517:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2945#discussion_r217485802
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
 ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.FragmentAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.db.DatabaseAdapter;
    +import org.apache.nifi.processors.standard.sql.SqlWriter;
    +import org.apache.nifi.processors.standard.util.JdbcCommon;
    +import org.apache.nifi.util.StopWatch;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.stream.IntStream;
    +
    +
    +public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
    +
    +    public static final String RESULT_TABLENAME = "tablename";
    +    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
    +
    +    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
    +    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
    +
    +    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Fetch Size")
    +            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
    +                    + "honored and/or exact. If the value specified is 
zero, then the hint is ignored.")
    +            .defaultValue("0")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
    +            .name("qdbt-max-rows")
    +            .displayName("Max Rows Per Flow File")
    +            .description("The maximum number of result rows that will be 
included in a single FlowFile. This will allow you to break up very large "
    +                    + "result sets into multiple FlowFiles. If the value 
specified is zero, then all rows are returned in a single FlowFile.")
    +            .defaultValue("0")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("qdbt-output-batch-size")
    +            .displayName("Output Batch Size")
    +            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all result set rows "
    +                    + "have been processed and the output FlowFiles are 
ready for transfer to the downstream relationship. For large result sets, this 
can cause a large burst of FlowFiles "
    +                    + "to be transferred at the end of processor 
execution. If this property is set, then when the specified number of FlowFiles 
are ready for transfer, then the session will "
    +                    + "be committed, thus releasing the FlowFiles to the 
downstream relationship. NOTE: The maxvalue.* and fragment.count attributes 
will not be set on FlowFiles when this "
    +                    + "property is set.")
    +            .defaultValue("0")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_FRAGMENTS = new 
PropertyDescriptor.Builder()
    +            .name("qdbt-max-frags")
    +            .displayName("Maximum Number of Fragments")
    +            .description("The maximum number of fragments. If the value 
specified is zero, then all fragments are returned. " +
    +                    "This prevents OutOfMemoryError when this processor 
ingests huge table. NOTE: Setting this property can result in data loss, as the 
incoming results are "
    +                    + "not ordered, and fragments may end at arbitrary 
boundaries where rows are not included in the result set.")
    +            .defaultValue("0")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .required(false)
    +                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
    +                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        maxValueProperties = getDefaultMaxValueProperties(context, null);
    +    }
    +
    +    @OnStopped
    +    public void stop() {
    +        // Reset the column type map in case properties change
    +        setupComplete.set(false);
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
    +        // Fetch the column/table info once
    +        if (!setupComplete.get()) {
    +            super.setup(context);
    +        }
    +        ProcessSession session = sessionFactory.createSession();
    +        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
    +
    +        final ComponentLog logger = getLogger();
    +
    +        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    --- End diff --
    
    Wouldn't it be better to get this once during `@OnScheduled`?


> Allow SQL results to be output as records in any supported format
> -----------------------------------------------------------------
>
>                 Key: NIFI-4517
>                 URL: https://issues.apache.org/jira/browse/NIFI-4517
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>            Priority: Major
>
> ExecuteSQL and QueryDatabaseTable currently only outputs Avro, and the schema 
> is only available as embedded within the flow file, not as an attribute such 
> as record-aware processors can handle.
> ExecuteSQL and QueryDatabaseTable processors should be updated with a 
> RecordSetWriter implementation. This will allow output using any writer 
> format (Avro, JSON, CSV, Free-form text, etc.), as well as all the other 
> features therein (such as writing the schema to an attribute, and will avoid 
> the need for a ConvertAvroToXYZ or ConvertRecord processor downstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to