markap14 commented on code in PR #6589: URL: https://github.com/apache/nifi/pull/6589#discussion_r1007004979
########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java: ########## @@ -0,0 +1,117 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ResponseMetadata; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.http.SdkHttpMetadata; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +abstract public class AwsMLFetcherProcessor<SERVICE extends AmazonWebServiceClient> + extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + public static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation"; + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_IN_PROGRESS = new Relationship.Builder() + .name("in progress") + .description("The job is currently still being processed") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Job successfully finished. FlowFile will be routed to this relation.") + .build(); + public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder() + .name("partial success") + .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " + + "It is generally expected to retry this relationship.") + .build(); Review Comment: I'm not sure that "Partial Success" makes sense here. If something was partially successful it wouldn't really make sense to retry. Perhaps "throttled" would make sense for the Relationship name. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java: ########## @@ -0,0 +1,117 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ResponseMetadata; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.http.SdkHttpMetadata; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +abstract public class AwsMLFetcherProcessor<SERVICE extends AmazonWebServiceClient> + extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + public static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation"; + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_IN_PROGRESS = new Relationship.Builder() + .name("in progress") + .description("The job is currently still being processed") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Job successfully finished. FlowFile will be routed to this relation.") + .build(); + public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder() + .name("partial success") + .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " + + "It is generally expected to retry this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("The job failed, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + protected final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + PROXY_USERNAME, + PROXY_PASSWORD)); + + public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason"; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SUCCESS, + REL_IN_PROGRESS, + REL_FAILURE + ))); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected SERVICE createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + throw new UnsupportedOperationException("Tried to create client in a deprecated way."); + } + + @Override + protected void init(ProcessorInitializationContext context) { + SimpleModule module = new SimpleModule(); + module.addDeserializer(ResponseMetadata.class, new AwsResponseMetadataDeserializer()); + SimpleModule module2 = new SimpleModule(); + module.addDeserializer(SdkHttpMetadata.class, new SdkHttpMetadataDeserializer()); + mapper.registerModule(module); + mapper.registerModule(module2); + } + + + protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, Object response) { + session.write(flowFile, out -> { + try (BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))) { + bufferedWriter.write(mapper.writeValueAsString(response)); + bufferedWriter.newLine(); Review Comment: Why add the newline? ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java: ########## @@ -0,0 +1,117 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ResponseMetadata; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.http.SdkHttpMetadata; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +abstract public class AwsMLFetcherProcessor<SERVICE extends AmazonWebServiceClient> + extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + public static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation"; + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Upon successful completion, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_IN_PROGRESS = new Relationship.Builder() + .name("in progress") + .description("The job is currently still being processed") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Job successfully finished. FlowFile will be routed to this relation.") + .build(); + public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder() + .name("partial success") + .description("Retrieving results failed for some reason, but the issue is likely to resolve on its own, such as Provisioned Throughput Exceeded or a Throttling failure. " + + "It is generally expected to retry this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("The job failed, the original FlowFile will be routed to this relationship.") + .autoTerminateDefault(true) + .build(); + protected final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + PROXY_USERNAME, + PROXY_PASSWORD)); + + public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason"; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_ORIGINAL, + REL_SUCCESS, + REL_IN_PROGRESS, + REL_FAILURE + ))); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected SERVICE createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + throw new UnsupportedOperationException("Tried to create client in a deprecated way."); + } + + @Override + protected void init(ProcessorInitializationContext context) { + SimpleModule module = new SimpleModule(); + module.addDeserializer(ResponseMetadata.class, new AwsResponseMetadataDeserializer()); + SimpleModule module2 = new SimpleModule(); Review Comment: We should avoid naming conventions such as `module` and `module2` - perhaps this should be `awsResponseModule` and `sdkHttpModule` or something of that nature? ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java: ########## @@ -0,0 +1,63 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning. Checking Polly synthesis job's status.") Review Comment: This should be more descriptive of what the processor is actually doing. "Retrieves the current status of an Amazon Polly job" perhaps? ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java: ########## @@ -0,0 +1,148 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder() + .name("json-payload") + .displayName("JSON Payload") + .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + JSON_PAYLOAD, + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + PROXY_USERNAME, + PROXY_PASSWORD)); + private final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + RESPONSE response; + try { + response = sendRequest(buildRequest(session, context, flowFile), context); + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Exception was thrown during sending AWS ML request.", e); + return; + } + + try { + writeToFlowFile(session, flowFile, response); Review Comment: We should not overwrite the contents of the FlowFile with the response. There will be use cases where we want the original content of the FlowFile. Instead, we should create a new child FlowFile and write the response to that. Then route the original one to an 'original' relationship. This way, the flow developer can access both the original content and the response. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java: ########## @@ -0,0 +1,63 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning. Checking Polly synthesis job's status.") +public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> { + + @Override + protected AmazonPollyClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonPollyClient) AmazonPollyClientBuilder.standard().withCredentials(credentialsProvider).build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + GetSpeechSynthesisTaskResult speechSynthesisTask = getSynthesisTask(flowFile); Review Comment: Given the above comments about the "Partially Successful" relationship, I would expect to have a relationship for exceeding request limits. We should also catch any other general exception that could be thrown from the client and route to 'failure'. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java: ########## @@ -0,0 +1,38 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMlProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning.") +@SeeAlso({PollyFetcher.class}) Review Comment: The fetcher should also have a `@SeeAlso` that mentions this processor as well. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java: ########## @@ -0,0 +1,38 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMlProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning.") +@SeeAlso({PollyFetcher.class}) +public class PollyProcessor extends AwsMlProcessor<AmazonPollyClient, StartSpeechSynthesisTaskRequest, StartSpeechSynthesisTaskResult> { Review Comment: We never use the word 'Processor' in a processor's name. We need to follow the convention of <Verb> <Noun>. So probably should use `StartAwsPollyJob` or `StartPollySpeechSynthesis` or something of that nature. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java: ########## @@ -0,0 +1,68 @@ +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.JobStatus; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; +import org.apache.nifi.processors.aws.ml.polly.PollyFetcher; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Automatically extract printed text, handwriting, and data from any document") Review Comment: Same comments as above on CapabilityDescription ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java: ########## @@ -0,0 +1,68 @@ +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.JobStatus; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; +import org.apache.nifi.processors.aws.ml.polly.PollyFetcher; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Automatically extract printed text, handwriting, and data from any document") +@SeeAlso({TextractProcessor.class}) +public class TextractFetcher extends AwsMLFetcherProcessor<AmazonTextractClient> { Review Comment: Same comments above regarding processor name ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java: ########## @@ -0,0 +1,148 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder() + .name("json-payload") + .displayName("JSON Payload") + .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + JSON_PAYLOAD, + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + PROXY_USERNAME, + PROXY_PASSWORD)); + private final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + RESPONSE response; + try { + response = sendRequest(buildRequest(session, context, flowFile), context); + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Exception was thrown during sending AWS ML request.", e); + return; + } + + try { + writeToFlowFile(session, flowFile, response); + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Exception was thrown during writing aws response to flow file.", e); + return; + } + + try { + postProcessFlowFile(context, session, flowFile, response); + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Exception was thrown during AWS ML post processing.", e); + return; + } Review Comment: We can avoid separating into these different blocks. We will handle the processing in the same way, regardless of of where the Exception was thrown. The only difference here is the Exception text. But the differing text of the log message is not really meaningful to the end user, and for developers diagnosing the problem, this is clear via the stack trace. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java: ########## @@ -0,0 +1,148 @@ +package org.apache.nifi.processors.aws.ml; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.AmazonWebServiceResult; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, REQUEST extends AmazonWebServiceRequest, RESPONSE extends AmazonWebServiceResult> extends AbstractAWSCredentialsProviderProcessor<SERVICE> { + protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId"; + public static final PropertyDescriptor JSON_PAYLOAD = new PropertyDescriptor.Builder() + .name("json-payload") + .displayName("JSON Payload") + .description("JSON Payload that represent an AWS ML Request. See more details in AWS API documentation.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE = + new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE) + .required(true) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + JSON_PAYLOAD, + MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + PROXY_CONFIGURATION_SERVICE, + PROXY_HOST, + PROXY_HOST_PORT, + PROXY_USERNAME, + PROXY_PASSWORD)); + private final ObjectMapper mapper = JsonMapper.builder() + .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true) + .build(); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + RESPONSE response; + try { + response = sendRequest(buildRequest(session, context, flowFile), context); + } catch (Exception e) { + session.transfer(flowFile, REL_FAILURE); + getLogger().error("Exception was thrown during sending AWS ML request.", e); Review Comment: We should verbiage such as "Exception was thrown" and instead indicate "Failed to send request to AWS Service" ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java: ########## @@ -0,0 +1,63 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult; +import com.amazonaws.services.polly.model.TaskStatus; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning. Checking Polly synthesis job's status.") +public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> { Review Comment: Processor names should always be in the form of <Verb> <Noun>. So Perhaps `GetAwsPollyStatus`? ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java: ########## @@ -0,0 +1,38 @@ +package org.apache.nifi.processors.aws.ml.polly; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.polly.AmazonPollyClient; +import com.amazonaws.services.polly.AmazonPollyClientBuilder; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest; +import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.ml.AwsMlProcessor; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"}) +@CapabilityDescription("Turn text into lifelike speech using deep learning.") Review Comment: The Capability Description should describe what this processor does, rather than describing what Amazon Polly does. This processor is not responsible for turning text into lifelike speech. Rather, it's responsible for starting an Amazon Polly job. Would probably make sense to also mention that this Processor starts the job, and it's expected to be followed up by the `GetPollyStatus` or whatever we call it, in order to make it clear how this should be used. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java: ########## @@ -0,0 +1,68 @@ +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.JobStatus; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; +import org.apache.nifi.processors.aws.ml.polly.PollyFetcher; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Automatically extract printed text, handwriting, and data from any document") +@SeeAlso({TextractProcessor.class}) +public class TextractFetcher extends AwsMLFetcherProcessor<AmazonTextractClient> { + public static final String DOCUMENT_ANALYSIS = "Document Analysis"; + public static final String DOCUMENT_TEXT_DETECTION = "Document Text Detection"; + public static final String EXPENSE_ANALYSIS = "Expense Analysis"; + private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING = + ImmutableMap.of(DOCUMENT_ANALYSIS, new TextractDocumentAnalysisResultUtil(), + DOCUMENT_TEXT_DETECTION, new TextractDocumentTextDetectionUtil(), + EXPENSE_ANALYSIS, new TextractDocumentExpenseAnalysisUtil()); + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTextractClient) AmazonTextractClient.builder().build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + String typeOfTextract = flowFile.getAttribute(TYPE.getName()); Review Comment: This should not be an attribute. It should be a property of the processor. This property should support expression language. So if a user wants to use an attribute, they are welcome to. But there's no need to require a specific attribute name for this. ########## nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java: ########## @@ -0,0 +1,68 @@ +package org.apache.nifi.processors.aws.ml.textract; + +import static org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.textract.AmazonTextractClient; +import com.amazonaws.services.textract.model.JobStatus; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor; +import org.apache.nifi.processors.aws.ml.polly.PollyFetcher; + +@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"}) +@CapabilityDescription("Automatically extract printed text, handwriting, and data from any document") +@SeeAlso({TextractProcessor.class}) +public class TextractFetcher extends AwsMLFetcherProcessor<AmazonTextractClient> { + public static final String DOCUMENT_ANALYSIS = "Document Analysis"; + public static final String DOCUMENT_TEXT_DETECTION = "Document Text Detection"; + public static final String EXPENSE_ANALYSIS = "Expense Analysis"; + private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING = + ImmutableMap.of(DOCUMENT_ANALYSIS, new TextractDocumentAnalysisResultUtil(), + DOCUMENT_TEXT_DETECTION, new TextractDocumentTextDetectionUtil(), + EXPENSE_ANALYSIS, new TextractDocumentExpenseAnalysisUtil()); + + @Override + protected AmazonTextractClient createClient(ProcessContext context, AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return (AmazonTextractClient) AmazonTextractClient.builder().build(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + String typeOfTextract = flowFile.getAttribute(TYPE.getName()); + AwsTextractTaskAware util = UTIL_MAPPING.get(typeOfTextract); Review Comment: I think this approach of having an interface with 3 different implementations, and then using a mapping is overkill. It adds significant complexity when we could just do something here as simple as: ``` final JobStatus jobStatus; switch (typeOfTextract) { case DOCUMENT_ANALYSIS: jobStatus = JobStatus.fromValue(client.getDocumentAnalysis(new GetDocumentAnalysisRequest().withJobId(awstaskId)).getJobStatus()); break; } ``` And the like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org