This is an automated email from the ASF dual-hosted git repository.

ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d1bcc2  NIFI-6925: Fixed JoltTransformRecord for RecordReaders, 
improved MockProcessSession (#3913)
6d1bcc2 is described below

commit 6d1bcc22e78129b2669c2e41af0c6b246e6780a9
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Dec 17 11:42:24 2019 -0500

    NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved 
MockProcessSession (#3913)
    
    * NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved 
MockProcessSession
    
    * Fixed logic for no records, added unit test
    
    * Fixed PutElasticsearchHttpRecord and PutHive3Streaming, same bug as 
JoltTransformRecord
    
    * Added null checks
---
 .../org/apache/nifi/util/MockProcessSession.java   |  2 +
 .../apache/nifi/util/TestMockProcessSession.java   |  6 +-
 .../elasticsearch/PutElasticsearchHttpRecord.java  | 20 +++---
 .../nifi/processors/hive/PutHive3Streaming.java    |  7 +-
 .../jolt/record/JoltTransformRecord.java           | 80 ++++++++++++----------
 .../jolt/record/TestJoltTransformRecord.java       | 18 +++++
 6 files changed, 80 insertions(+), 53 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 78ff117..7bf9a03 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -589,6 +589,7 @@ public class MockProcessSession implements ProcessSession {
         final MockFlowFile mock = validateState(flowFile);
 
         final ByteArrayInputStream bais = new 
ByteArrayInputStream(mock.getData());
+        incrementReadCount(flowFile);
         final InputStream errorHandlingStream = new InputStream() {
             @Override
             public int read() throws IOException {
@@ -602,6 +603,7 @@ public class MockProcessSession implements ProcessSession {
 
             @Override
             public void close() throws IOException {
+                decrementReadCount(flowFile);
                 openInputStreams.remove(mock);
                 bais.close();
             }
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 295e330..6ba99c7 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -51,13 +51,11 @@ public class TestMockProcessSession {
 
         assertEquals("hello, world", new String(buffer));
 
-        session.remove(flowFile);
-
         try {
             session.commit();
             Assert.fail("Was able to commit session without closing 
InputStream");
-        } catch (final FlowFileHandlingException ffhe) {
-            System.out.println(ffhe.toString());
+        } catch (final FlowFileHandlingException | IllegalStateException e) {
+            System.out.println(e.toString());
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 5879865..f1c45a0 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -576,17 +576,6 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                         i++;
                     }
                 }
-
-                session.putAttribute(successFlowFile, "record.count", 
Integer.toString(recordCount - failures.size()));
-
-                // Normal behavior is to output with record.count. In order to 
not break backwards compatibility, set both here.
-                session.putAttribute(failedFlowFile, "record.count", 
Integer.toString(failures.size()));
-                session.putAttribute(failedFlowFile, "failure.count", 
Integer.toString(failures.size()));
-
-                session.transfer(successFlowFile, REL_SUCCESS);
-                session.transfer(failedFlowFile, REL_FAILURE);
-                session.remove(inputFlowFile);
-
             } catch (final IOException | SchemaNotFoundException | 
MalformedRecordException e) {
                 // We failed while handling individual failures. Not much else 
we can do other than log, and route the whole thing to failure.
                 getLogger().error("Failed to process {} during individual 
record failure handling; route whole FF to failure", new Object[] {flowFile, 
e});
@@ -597,7 +586,16 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                 if (failedFlowFile != null) {
                     session.remove(failedFlowFile);
                 }
+                return;
             }
+            session.putAttribute(successFlowFile, "record.count", 
Integer.toString(recordCount - failures.size()));
+
+            // Normal behavior is to output with record.count. In order to not 
break backwards compatibility, set both here.
+            session.putAttribute(failedFlowFile, "record.count", 
Integer.toString(failures.size()));
+            session.putAttribute(failedFlowFile, "failure.count", 
Integer.toString(failures.size()));
+            session.transfer(successFlowFile, REL_SUCCESS);
+            session.transfer(failedFlowFile, REL_FAILURE);
+            session.remove(inputFlowFile);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 5558c79..db3e5a9 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -384,10 +384,10 @@ public class PutHive3Streaming extends AbstractProcessor {
 
         StreamingConnection hiveStreamingConnection = null;
 
-        try (final InputStream in = session.read(flowFile)) {
+        try {
             final RecordReader reader;
 
-            try {
+            try(final InputStream in = session.read(flowFile)) {
                 // if we fail to create the RecordReader then we want to route 
to failure, so we need to
                 // handle this separately from the other IOExceptions which 
normally route to retry
                 try {
@@ -409,7 +409,6 @@ public class PutHive3Streaming extends AbstractProcessor {
                 updateAttributes.put(ATTR_OUTPUT_TABLES, 
options.getQualifiedTableName());
                 flowFile = session.putAllAttributes(flowFile, 
updateAttributes);
                 session.getProvenanceReporter().send(flowFile, 
hiveStreamingConnection.getMetastoreUri());
-                session.transfer(flowFile, REL_SUCCESS);
             } catch (TransactionError te) {
                 if (rollbackOnFailure) {
                     throw new ProcessException(te.getLocalizedMessage(), te);
@@ -426,8 +425,10 @@ public class PutHive3Streaming extends AbstractProcessor {
                             rrfe
                     );
                     session.transfer(flowFile, REL_FAILURE);
+                    return;
                 }
             }
+            session.transfer(flowFile, REL_SUCCESS);
         } catch (InvalidTable | SerializationError | StreamingIOFailure | 
IOException e) {
             if (rollbackOnFailure) {
                 if (hiveStreamingConnection != null) {
diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 59629b6..013ca38 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -59,6 +59,7 @@ import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.FilenameFilter;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -296,42 +297,44 @@ public class JoltTransformRecord extends 
AbstractProcessor {
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         final RecordSchema schema;
+        FlowFile transformed = null;
+
         try (final InputStream in = session.read(original);
              final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
             schema = writerFactory.getSchema(original.getAttributes(), 
reader.getSchema());
 
-            FlowFile transformed = session.create(original);
             final Map<String, String> attributes = new HashMap<>();
             final WriteResult writeResult;
+            transformed = session.create(original);
+
+            // We want to transform the first record before creating the 
Record Writer. We do this because the Record will likely end up with a 
different structure
+            // and therefore a difference Schema after being transformed. As a 
result, we want to transform the Record and then provide the transformed schema 
to the
+            // Record Writer so that if the Record Writer chooses to inherit 
the Record Schema from the Record itself, it will inherit the transformed 
schema, not the
+            // schema determined by the Record Reader.
+            final Record firstRecord = reader.nextRecord();
+            if (firstRecord == null) {
+                try (final OutputStream out = session.write(transformed);
+                     final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
 
-            try {
-                // We want to transform the first record before creating the 
Record Writer. We do this because the Record will likely end up with a 
different structure
-                // and therefore a difference Schema after being transformed. 
As a result, we want to transform the Record and then provide the transformed 
schema to the
-                // Record Writer so that if the Record Writer chooses to 
inherit the Record Schema from the Record itself, it will inherit the 
transformed schema, not the
-                // schema determined by the Record Reader.
-                final Record firstRecord = reader.nextRecord();
-                if (firstRecord == null) {
-                    try (final OutputStream out = session.write(transformed);
-                         final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
-
-                        writer.beginRecordSet();
-                        writeResult = writer.finishRecordSet();
-
-                        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
-                        attributes.putAll(writeResult.getAttributes());
-                    }
+                    writer.beginRecordSet();
+                    writeResult = writer.finishRecordSet();
 
-                    transformed = session.putAllAttributes(transformed, 
attributes);
-                    session.transfer(transformed, REL_SUCCESS);
-                    session.transfer(original, REL_ORIGINAL);
-                    logger.info("{} had no Records to transform", new 
Object[]{original});
-                    return;
+                    attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                    attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                    attributes.putAll(writeResult.getAttributes());
                 }
 
+                transformed = session.putAllAttributes(transformed, 
attributes);
+                logger.info("{} had no Records to transform", new 
Object[]{original});
+            } else {
+
                 final JoltTransform transform = getTransform(context, 
original);
                 final Record transformedFirstRecord = transform(firstRecord, 
transform);
 
+                if (transformedFirstRecord == null) {
+                    throw new ProcessException("Error transforming the first 
record");
+                }
+
                 final RecordSchema writeSchema = 
writerFactory.getSchema(original.getAttributes(), 
transformedFirstRecord.getSchema());
 
                 // TODO: Is it possible that two Records with the same input 
schema could have different schemas after transformation?
@@ -353,27 +356,34 @@ public class JoltTransformRecord extends 
AbstractProcessor {
 
                     writeResult = writer.finishRecordSet();
 
+                    try {
+                        writer.close();
+                    } catch (final IOException ioe) {
+                        getLogger().warn("Failed to close Writer for {}", new 
Object[]{transformed});
+                    }
+
                     attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
                     attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
                     attributes.putAll(writeResult.getAttributes());
                 }
-            } catch (Exception e) {
-                logger.error("Unable to write transformed records {} due to 
{}", new Object[]{original, e.toString(), e});
-                session.remove(transformed);
-                session.transfer(original, REL_FAILURE);
-                return;
-            }
 
-            final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
-            transformed = session.putAllAttributes(transformed, attributes);
-            session.transfer(transformed, REL_SUCCESS);
-            session.getProvenanceReporter().modifyContent(transformed, 
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(original, REL_ORIGINAL);
-            logger.debug("Transformed {}", new Object[]{original});
+                final String transformType = 
context.getProperty(JOLT_TRANSFORM).getValue();
+                transformed = session.putAllAttributes(transformed, 
attributes);
+                session.getProvenanceReporter().modifyContent(transformed, 
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                logger.debug("Transformed {}", new Object[]{original});
+            }
         } catch (final Exception ex) {
             logger.error("Unable to transform {} due to {}", new 
Object[]{original, ex.toString(), ex});
             session.transfer(original, REL_FAILURE);
+            if (transformed != null) {
+                session.remove(transformed);
+            }
+            return;
+        }
+        if (transformed != null) {
+            session.transfer(transformed, REL_SUCCESS);
         }
+        session.transfer(original, REL_ORIGINAL);
     }
 
     private Record transform(final Record record, final JoltTransform 
transform) {
diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index e7b43b5..1c5365f 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -165,6 +165,24 @@ public class TestJoltTransformRecord {
     }
 
     @Test
+    public void testNoRecords() throws IOException {
+        generateTestData(0, null);
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(writer, "Pretty Print JSON", "true");
+        runner.enableControllerService(writer);
+        final String spec = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+        runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+        runner.enqueue("{}");
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+    }
+
+    @Test
     public void testInvalidFlowFileContent() throws IOException {
         generateTestData(1, null);
 

Reply via email to