markap14 commented on code in PR #6589:
URL: https://github.com/apache/nifi/pull/6589#discussion_r1007004979


##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();

Review Comment:
   I'm not sure that "Partial Success" makes sense here. If something was 
partially successful it wouldn't really make sense to retry. Perhaps 
"throttled" would make sense for the Relationship name.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The job failed, the original FlowFile will be routed 
to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    protected final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+
+    public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SUCCESS,
+            REL_IN_PROGRESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected SERVICE createClient(ProcessContext context, AWSCredentials 
credentials, ClientConfiguration config) {
+        throw new UnsupportedOperationException("Tried to create client in a 
deprecated way.");
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(ResponseMetadata.class, new 
AwsResponseMetadataDeserializer());
+        SimpleModule module2 = new SimpleModule();
+        module.addDeserializer(SdkHttpMetadata.class, new 
SdkHttpMetadataDeserializer());
+        mapper.registerModule(module);
+        mapper.registerModule(module2);
+    }
+
+
+    protected void writeToFlowFile(ProcessSession session, FlowFile flowFile, 
Object response) {
+        session.write(flowFile, out -> {
+            try (BufferedWriter bufferedWriter = new BufferedWriter(new 
OutputStreamWriter(out, StandardCharsets.UTF_8))) {
+                bufferedWriter.write(mapper.writeValueAsString(response));
+                bufferedWriter.newLine();

Review Comment:
   Why add the newline?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMLFetcherProcessor.java:
##########
@@ -0,0 +1,117 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.http.SdkHttpMetadata;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+abstract public class AwsMLFetcherProcessor<SERVICE extends 
AmazonWebServiceClient>
+        extends AbstractAWSCredentialsProviderProcessor<SERVICE>  {
+    public static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final String AWS_TASK_OUTPUT_LOCATION = "outputLocation";
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Upon successful completion, the original FlowFile 
will be routed to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    public static final Relationship REL_IN_PROGRESS = new 
Relationship.Builder()
+            .name("in progress")
+            .description("The job is currently still being processed")
+            .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Job successfully finished. FlowFile will be routed 
to this relation.")
+            .build();
+    public final Relationship REL_PARTIAL_SUCCESS = new Relationship.Builder()
+            .name("partial success")
+            .description("Retrieving results failed for some reason, but the 
issue is likely to resolve on its own, such as Provisioned Throughput Exceeded 
or a Throttling failure. " +
+                    "It is generally expected to retry this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The job failed, the original FlowFile will be routed 
to this relationship.")
+            .autoTerminateDefault(true)
+            .build();
+    protected final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+
+    public static final String FAILURE_REASON_ATTRIBUTE = "failure.reason";
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_ORIGINAL,
+            REL_SUCCESS,
+            REL_IN_PROGRESS,
+            REL_FAILURE
+    )));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected SERVICE createClient(ProcessContext context, AWSCredentials 
credentials, ClientConfiguration config) {
+        throw new UnsupportedOperationException("Tried to create client in a 
deprecated way.");
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(ResponseMetadata.class, new 
AwsResponseMetadataDeserializer());
+        SimpleModule module2 = new SimpleModule();

Review Comment:
   We should avoid naming conventions such as `module` and `module2` - perhaps 
this should be `awsResponseModule` and `sdkHttpModule` or something of that 
nature?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")

Review Comment:
   This should be more descriptive of what the processor is actually doing. 
"Retrieves the current status of an Amazon Polly job" perhaps?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);

Review Comment:
   We should not overwrite the contents of the FlowFile with the response. 
There will be use cases where we want the original content of the FlowFile. 
Instead, we should create a new child FlowFile and write the response to that. 
Then route the original one to an 'original' relationship. This way, the flow 
developer can access both the original content and the response.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")
+public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> {
+
+    @Override
+    protected AmazonPollyClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonPollyClient) 
AmazonPollyClientBuilder.standard().withCredentials(credentialsProvider).build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        GetSpeechSynthesisTaskResult speechSynthesisTask = 
getSynthesisTask(flowFile);

Review Comment:
   Given the above comments about the "Partially Successful" relationship, I 
would expect to have a relationship for exceeding request limits. We should 
also catch any other general exception that could be thrown from the client and 
route to 'failure'. 



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")
+@SeeAlso({PollyFetcher.class})

Review Comment:
   The fetcher should also have a `@SeeAlso` that mentions this processor as 
well.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")
+@SeeAlso({PollyFetcher.class})
+public class PollyProcessor extends AwsMlProcessor<AmazonPollyClient, 
StartSpeechSynthesisTaskRequest, StartSpeechSynthesisTaskResult> {

Review Comment:
   We never use the word 'Processor' in a processor's name. We need to follow 
the convention of <Verb> <Noun>. So probably should use `StartAwsPollyJob` or 
`StartPollySpeechSynthesis` or something of that nature.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")

Review Comment:
   Same comments as above on CapabilityDescription



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {

Review Comment:
   Same comments above regarding processor name



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);
+            return;
+        }
+
+        try {
+            writeToFlowFile(session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during writing aws 
response to flow file.", e);
+            return;
+        }
+
+        try {
+            postProcessFlowFile(context, session, flowFile, response);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during AWS ML post 
processing.", e);
+            return;
+        }

Review Comment:
   We can avoid separating into these different blocks. We will handle the 
processing in the same way, regardless of of where the Exception was thrown. 
The only difference here is the Exception text. But the differing text of the 
log message is not really meaningful to the end user, and for developers 
diagnosing the problem, this is clear via the stack trace.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/AwsMlProcessor.java:
##########
@@ -0,0 +1,148 @@
+package org.apache.nifi.processors.aws.ml;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.AmazonWebServiceResult;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+public abstract class AwsMlProcessor<SERVICE extends AmazonWebServiceClient, 
REQUEST extends AmazonWebServiceRequest, RESPONSE extends 
AmazonWebServiceResult> extends 
AbstractAWSCredentialsProviderProcessor<SERVICE> {
+    protected static final String AWS_TASK_ID_PROPERTY = "awsTaskId";
+    public static final PropertyDescriptor JSON_PAYLOAD = new 
PropertyDescriptor.Builder()
+            .name("json-payload")
+            .displayName("JSON Payload")
+            .description("JSON Payload that represent an AWS ML Request. See 
more details in AWS API documentation.")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+    public static final PropertyDescriptor 
MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE =
+            new 
PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE)
+                    .required(true)
+                    .build();
+    private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            JSON_PAYLOAD,
+            MANDATORY_AWS_CREDENTIALS_PROVIDER_SERVICE,
+            TIMEOUT,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            PROXY_CONFIGURATION_SERVICE,
+            PROXY_HOST,
+            PROXY_HOST_PORT,
+            PROXY_USERNAME,
+            PROXY_PASSWORD));
+    private final ObjectMapper mapper = JsonMapper.builder()
+            .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+            .build();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        RESPONSE response;
+        try {
+            response = sendRequest(buildRequest(session, context, flowFile), 
context);
+        } catch (Exception e) {
+            session.transfer(flowFile, REL_FAILURE);
+            getLogger().error("Exception was thrown during sending AWS ML 
request.", e);

Review Comment:
   We should verbiage such as "Exception was thrown" and instead indicate 
"Failed to send request to AWS Service"



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyFetcher.java:
##########
@@ -0,0 +1,63 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.GetSpeechSynthesisTaskResult;
+import com.amazonaws.services.polly.model.TaskStatus;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning. 
Checking Polly synthesis job's status.")
+public class PollyFetcher extends AwsMLFetcherProcessor<AmazonPollyClient> {

Review Comment:
   Processor names should always be in the form of <Verb> <Noun>. So Perhaps 
`GetAwsPollyStatus`?



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/polly/PollyProcessor.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.nifi.processors.aws.ml.polly;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.polly.AmazonPollyClient;
+import com.amazonaws.services.polly.AmazonPollyClientBuilder;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskRequest;
+import com.amazonaws.services.polly.model.StartSpeechSynthesisTaskResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.ml.AwsMlProcessor;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Polly"})
+@CapabilityDescription("Turn text into lifelike speech using deep learning.")

Review Comment:
   The Capability Description should describe what this processor does, rather 
than describing what Amazon Polly does. This processor is not responsible for 
turning text into lifelike speech. Rather, it's responsible for starting an 
Amazon Polly job. Would probably make sense to also mention that this Processor 
starts the job, and it's expected to be followed up by the `GetPollyStatus` or 
whatever we call it, in order to make it clear how this should be used.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {
+    public static final String DOCUMENT_ANALYSIS = "Document Analysis";
+    public static final String DOCUMENT_TEXT_DETECTION = "Document Text 
Detection";
+    public static final String EXPENSE_ANALYSIS = "Expense Analysis";
+    private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING =
+            ImmutableMap.of(DOCUMENT_ANALYSIS, new 
TextractDocumentAnalysisResultUtil(),
+                    DOCUMENT_TEXT_DETECTION, new 
TextractDocumentTextDetectionUtil(),
+                    EXPENSE_ANALYSIS, new 
TextractDocumentExpenseAnalysisUtil());
+
+    @Override
+    protected AmazonTextractClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonTextractClient) AmazonTextractClient.builder().build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        String typeOfTextract = flowFile.getAttribute(TYPE.getName());

Review Comment:
   This should not be an attribute. It should be a property of the processor. 
This property should support expression language. So if a user wants to use an 
attribute, they are welcome to. But there's no need to require a specific 
attribute name for this.



##########
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/ml/textract/TextractFetcher.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.nifi.processors.aws.ml.textract;
+
+import static 
org.apache.nifi.processors.aws.ml.textract.TextractProcessor.TYPE;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.textract.AmazonTextractClient;
+import com.amazonaws.services.textract.model.JobStatus;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.aws.ml.AwsMLFetcherProcessor;
+import org.apache.nifi.processors.aws.ml.polly.PollyFetcher;
+
+@Tags({"Amazon", "AWS", "ML", "Machine Learning", "Textract"})
+@CapabilityDescription("Automatically extract printed text, handwriting, and 
data from any document")
+@SeeAlso({TextractProcessor.class})
+public class TextractFetcher extends 
AwsMLFetcherProcessor<AmazonTextractClient> {
+    public static final String DOCUMENT_ANALYSIS = "Document Analysis";
+    public static final String DOCUMENT_TEXT_DETECTION = "Document Text 
Detection";
+    public static final String EXPENSE_ANALYSIS = "Expense Analysis";
+    private static final Map<String, AwsTextractTaskAware> UTIL_MAPPING =
+            ImmutableMap.of(DOCUMENT_ANALYSIS, new 
TextractDocumentAnalysisResultUtil(),
+                    DOCUMENT_TEXT_DETECTION, new 
TextractDocumentTextDetectionUtil(),
+                    EXPENSE_ANALYSIS, new 
TextractDocumentExpenseAnalysisUtil());
+
+    @Override
+    protected AmazonTextractClient createClient(ProcessContext context, 
AWSCredentialsProvider credentialsProvider, ClientConfiguration config) {
+        return (AmazonTextractClient) AmazonTextractClient.builder().build();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        String typeOfTextract = flowFile.getAttribute(TYPE.getName());
+        AwsTextractTaskAware util = UTIL_MAPPING.get(typeOfTextract);

Review Comment:
   I think this approach of having an interface with 3 different 
implementations, and then using a mapping is overkill. It adds significant 
complexity when we could just do something here as simple as:
   ```
   final JobStatus jobStatus;
   switch (typeOfTextract) {
       case DOCUMENT_ANALYSIS:
           jobStatus = JobStatus.fromValue(client.getDocumentAnalysis(new 
GetDocumentAnalysisRequest().withJobId(awstaskId)).getJobStatus());
          break;
   }
   ```
   And the like. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to