lizhizhou commented on code in PR #6844: URL: https://github.com/apache/nifi/pull/6844#discussion_r1104150138
########## nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/QueryIoTDBRecord.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.SimpleRecordSchema; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; + +@EventDriven +@SupportsBatching +@Tags({"iotdb", "insert", "tablet"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription( + "This is a processor that reads the sql query from the incoming FlowFile and using it to " + + "query the result from IoTDB using native interface." + + "Then it use the configured 'Record Writer' to generate the flowfile ") +public class QueryIoTDBRecord extends AbstractIoTDB { + + public static final String IOTDB_EXECUTED_QUERY = "iotdb.executed.query"; + + private static final int DEFAULT_IOTDB_FETCH_SIZE = 10000; + + public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. " + + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor IOTDB_QUERY = new PropertyDescriptor.Builder() + .name("iotdb-query") + .displayName("IoTDB Query") + .description("The IoTDB query to execute. " + + "Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise" + + " it is created from this property.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final Integer FETCH_SIZE = 100000; + public static final PropertyDescriptor IOTDB_QUERY_FETCH_SIZE = new PropertyDescriptor.Builder() + .name("iotdb-query-chunk-size") + .displayName("Fetch Size") + .description("Chunking can be used to return results in a stream of smaller batches " + + "(each has a partial results up to a chunk size) rather than as a single response. " + + "Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0") + .defaultValue(String.valueOf(DEFAULT_IOTDB_FETCH_SIZE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createLongValidator(0, FETCH_SIZE, true)) + .required(true) + .build(); + + private static RecordSetWriterFactory recordSetWriterFactory; + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); + propertyDescriptors.add(IOTDB_QUERY); + propertyDescriptors.add(IOTDB_QUERY_FETCH_SIZE); + propertyDescriptors.add(RECORD_WRITER_FACTORY); + return Collections.unmodifiableList(propertyDescriptors); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IoTDBConnectionException { + super.onScheduled(context); + // Either input connection or scheduled query is required + if ( ! context.getProperty(IOTDB_QUERY).isSet() + && ! context.hasIncomingConnection() ) { + String error = "The IoTDB Query processor requires input connection or scheduled IoTDB query"; + getLogger().error(error); + throw new ProcessException(error); + } + recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException { + + String query; + Charset charset; + FlowFile outgoingFlowFile; + int fetchSize = DEFAULT_IOTDB_FETCH_SIZE; + // If there are incoming connections, prepare query params from flow file + if ( context.hasIncomingConnection() ) { + FlowFile incomingFlowFile = processSession.get(); + + if ( incomingFlowFile == null && context.hasNonLoopConnection() ) { + return; + } + fetchSize = context.getProperty(IOTDB_QUERY_FETCH_SIZE).evaluateAttributeExpressions(incomingFlowFile).asInteger(); + charset = Charset.forName("UTF-8"); + if ( (incomingFlowFile != null ? incomingFlowFile.getSize() : 0) == 0 ) { + if ( context.getProperty(IOTDB_QUERY).isSet() ) { + query = context.getProperty(IOTDB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue(); + } else { + String message = "FlowFile query is empty and no scheduled query is set"; + getLogger().error(message); + incomingFlowFile = processSession.putAttribute(incomingFlowFile, "iotdb.error.message", message); + processSession.transfer(incomingFlowFile, REL_FAILURE); + return; + } + } else { + try { + query = getQuery(processSession, charset, incomingFlowFile); + } catch(IOException ioe) { + getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), ioe); + throw new ProcessException(ioe); + } + } + outgoingFlowFile = incomingFlowFile; + } else { + outgoingFlowFile = processSession.create(); + query = context.getProperty(IOTDB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue(); + } + + try { + SessionDataSet dataSet = session.get().executeQueryStatement(query); + List<Map<String,Object>> result=new ArrayList<>(); + dataSet.setFetchSize(fetchSize); + List<String> fieldType = dataSet.getColumnTypes(); + List<String> fieldNames = dataSet.getColumnNames(); + List<RecordField> recordFields = new ArrayList<>(); + for(int i = 0; i< fieldNames.size(); i++ ){ + recordFields.add(new RecordField(fieldNames.get(i), getType(fieldType.get(i)).getDataType())); + } + final RecordSchema schema = new SimpleRecordSchema(recordFields); + PipedInputStream inputStream = new PipedInputStream(); + final PipedOutputStream outputStream = new PipedOutputStream(inputStream); Review Comment: Refine the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org