Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187357216 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " + + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " + + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " + + "Each record can contain multiple fields with each field separated by the 'Field Separator' property." + ) +@WritesAttributes({ + @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), + @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), + }) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successful DeepLearning4j results are routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed DeepLearning4j results are routed to this relationship").build(); + + protected Gson gson = new Gson(); + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + static { + final Set<Relationship> tempRelationships = new HashSet<>(); + tempRelationships.add(REL_SUCCESS); + tempRelationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(tempRelationships); + final List<PropertyDescriptor> tempDescriptors = new ArrayList<>(); + tempDescriptors.add(MODEL_FILE); + tempDescriptors.add(RECORD_DIMENSIONS); + tempDescriptors.add(CHARSET); + tempDescriptors.add(FIELD_SEPARATOR); + tempDescriptors.add(RECORD_SEPARATOR); + propertyDescriptors = Collections.unmodifiableList(tempDescriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + if ( flowFile.getSize() == 0 ) { + String message = "FlowFile query is empty"; + getLogger().error(message); + flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message); + session.transfer(flowFile, REL_FAILURE); + return; + } + + String input = null; + try { + input = getFlowFileContents(session, charset, flowFile); + String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + + int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions }); + } + + MultiLayerNetwork model = getModel(context); + + long startTimeMillis = System.currentTimeMillis(); + + String [] inputRecords = input.split(recordSeparator); + + List<INDArray> features = Arrays.stream(inputRecords).map( + record -> { + double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble( + field -> Double.parseDouble(field)).toArray(); + + INDArray featureInput = Nd4j.create(parameters, dimensions); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ", + new Object[] {record, parameters, dimensions, featureInput}); + } + + return featureInput; + + }).collect(Collectors.toList()); + + INDArray allFeatures = Nd4j.vstack(features); + + INDArray results = model.output(allFeatures); + + double [][] partitionedResults = new double[inputRecords.length][]; + for (int row = 0; row < inputRecords.length; row++) { + INDArray result = results.getRow(row); + partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector(); + } + + String jsonResult = gson.toJson(partitionedResults); + int [] shape = results.shape(); + String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length)); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}", + new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape}); + } + + try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) { + flowFile = session.importFrom(bais, flowFile); + } + + session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape); + + final long endTimeMillis = System.currentTimeMillis(); + + session.transfer(flowFile, REL_SUCCESS); + + session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context), + (endTimeMillis - startTimeMillis)); + } catch (Exception exception) { + flowFile = populateErrorAttributes(session, flowFile, exception.getMessage()); + getLogger().error("Failed to process data due to {} for input {}", + new Object[]{exception.getLocalizedMessage(), input}, exception); + session.transfer(flowFile, REL_FAILURE); + context.yield(); --- End diff -- I don't think context.yield() is appropriate here. There is no condition that is occurring that would result in the processor unable to perform its task for some period of time, after which it would work again. That's what context.yield() is for. In this case, it is likely that the processor is misconfigured or that the data is invalid, correct? So in that case, I would just route to failure and continue on.
---