This is an automated email from the ASF dual-hosted git repository.
ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 6d1bcc2 NIFI-6925: Fixed JoltTransformRecord for RecordReaders,
improved MockProcessSession (#3913)
6d1bcc2 is described below
commit 6d1bcc22e78129b2669c2e41af0c6b246e6780a9
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Dec 17 11:42:24 2019 -0500
NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved
MockProcessSession (#3913)
* NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved
MockProcessSession
* Fixed logic for no records, added unit test
* Fixed PutElasticsearchHttpRecord and PutHive3Streaming, same bug as
JoltTransformRecord
* Added null checks
---
.../org/apache/nifi/util/MockProcessSession.java | 2 +
.../apache/nifi/util/TestMockProcessSession.java | 6 +-
.../elasticsearch/PutElasticsearchHttpRecord.java | 20 +++---
.../nifi/processors/hive/PutHive3Streaming.java | 7 +-
.../jolt/record/JoltTransformRecord.java | 80 ++++++++++++----------
.../jolt/record/TestJoltTransformRecord.java | 18 +++++
6 files changed, 80 insertions(+), 53 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 78ff117..7bf9a03 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -589,6 +589,7 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile mock = validateState(flowFile);
final ByteArrayInputStream bais = new
ByteArrayInputStream(mock.getData());
+ incrementReadCount(flowFile);
final InputStream errorHandlingStream = new InputStream() {
@Override
public int read() throws IOException {
@@ -602,6 +603,7 @@ public class MockProcessSession implements ProcessSession {
@Override
public void close() throws IOException {
+ decrementReadCount(flowFile);
openInputStreams.remove(mock);
bais.close();
}
diff --git
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index 295e330..6ba99c7 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -51,13 +51,11 @@ public class TestMockProcessSession {
assertEquals("hello, world", new String(buffer));
- session.remove(flowFile);
-
try {
session.commit();
Assert.fail("Was able to commit session without closing
InputStream");
- } catch (final FlowFileHandlingException ffhe) {
- System.out.println(ffhe.toString());
+ } catch (final FlowFileHandlingException | IllegalStateException e) {
+ System.out.println(e.toString());
}
}
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 5879865..f1c45a0 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -576,17 +576,6 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
i++;
}
}
-
- session.putAttribute(successFlowFile, "record.count",
Integer.toString(recordCount - failures.size()));
-
- // Normal behavior is to output with record.count. In order to
not break backwards compatibility, set both here.
- session.putAttribute(failedFlowFile, "record.count",
Integer.toString(failures.size()));
- session.putAttribute(failedFlowFile, "failure.count",
Integer.toString(failures.size()));
-
- session.transfer(successFlowFile, REL_SUCCESS);
- session.transfer(failedFlowFile, REL_FAILURE);
- session.remove(inputFlowFile);
-
} catch (final IOException | SchemaNotFoundException |
MalformedRecordException e) {
// We failed while handling individual failures. Not much else
we can do other than log, and route the whole thing to failure.
getLogger().error("Failed to process {} during individual
record failure handling; route whole FF to failure", new Object[] {flowFile,
e});
@@ -597,7 +586,16 @@ public class PutElasticsearchHttpRecord extends
AbstractElasticsearchHttpProcess
if (failedFlowFile != null) {
session.remove(failedFlowFile);
}
+ return;
}
+ session.putAttribute(successFlowFile, "record.count",
Integer.toString(recordCount - failures.size()));
+
+ // Normal behavior is to output with record.count. In order to not
break backwards compatibility, set both here.
+ session.putAttribute(failedFlowFile, "record.count",
Integer.toString(failures.size()));
+ session.putAttribute(failedFlowFile, "failure.count",
Integer.toString(failures.size()));
+ session.transfer(successFlowFile, REL_SUCCESS);
+ session.transfer(failedFlowFile, REL_FAILURE);
+ session.remove(inputFlowFile);
}
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 5558c79..db3e5a9 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -384,10 +384,10 @@ public class PutHive3Streaming extends AbstractProcessor {
StreamingConnection hiveStreamingConnection = null;
- try (final InputStream in = session.read(flowFile)) {
+ try {
final RecordReader reader;
- try {
+ try(final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route
to failure, so we need to
// handle this separately from the other IOExceptions which
normally route to retry
try {
@@ -409,7 +409,6 @@ public class PutHive3Streaming extends AbstractProcessor {
updateAttributes.put(ATTR_OUTPUT_TABLES,
options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile,
updateAttributes);
session.getProvenanceReporter().send(flowFile,
hiveStreamingConnection.getMetastoreUri());
- session.transfer(flowFile, REL_SUCCESS);
} catch (TransactionError te) {
if (rollbackOnFailure) {
throw new ProcessException(te.getLocalizedMessage(), te);
@@ -426,8 +425,10 @@ public class PutHive3Streaming extends AbstractProcessor {
rrfe
);
session.transfer(flowFile, REL_FAILURE);
+ return;
}
}
+ session.transfer(flowFile, REL_SUCCESS);
} catch (InvalidTable | SerializationError | StreamingIOFailure |
IOException e) {
if (rollbackOnFailure) {
if (hiveStreamingConnection != null) {
diff --git
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 59629b6..013ca38 100644
---
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -59,6 +59,7 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.io.FilenameFilter;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -296,42 +297,44 @@ public class JoltTransformRecord extends
AbstractProcessor {
final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema;
+ FlowFile transformed = null;
+
try (final InputStream in = session.read(original);
final RecordReader reader =
readerFactory.createRecordReader(original, in, getLogger())) {
schema = writerFactory.getSchema(original.getAttributes(),
reader.getSchema());
- FlowFile transformed = session.create(original);
final Map<String, String> attributes = new HashMap<>();
final WriteResult writeResult;
+ transformed = session.create(original);
+
+ // We want to transform the first record before creating the
Record Writer. We do this because the Record will likely end up with a
different structure
+ // and therefore a difference Schema after being transformed. As a
result, we want to transform the Record and then provide the transformed schema
to the
+ // Record Writer so that if the Record Writer chooses to inherit
the Record Schema from the Record itself, it will inherit the transformed
schema, not the
+ // schema determined by the Record Reader.
+ final Record firstRecord = reader.nextRecord();
+ if (firstRecord == null) {
+ try (final OutputStream out = session.write(transformed);
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
- try {
- // We want to transform the first record before creating the
Record Writer. We do this because the Record will likely end up with a
different structure
- // and therefore a difference Schema after being transformed.
As a result, we want to transform the Record and then provide the transformed
schema to the
- // Record Writer so that if the Record Writer chooses to
inherit the Record Schema from the Record itself, it will inherit the
transformed schema, not the
- // schema determined by the Record Reader.
- final Record firstRecord = reader.nextRecord();
- if (firstRecord == null) {
- try (final OutputStream out = session.write(transformed);
- final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, out, transformed)) {
-
- writer.beginRecordSet();
- writeResult = writer.finishRecordSet();
-
- attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- }
+ writer.beginRecordSet();
+ writeResult = writer.finishRecordSet();
- transformed = session.putAllAttributes(transformed,
attributes);
- session.transfer(transformed, REL_SUCCESS);
- session.transfer(original, REL_ORIGINAL);
- logger.info("{} had no Records to transform", new
Object[]{original});
- return;
+ attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
}
+ transformed = session.putAllAttributes(transformed,
attributes);
+ logger.info("{} had no Records to transform", new
Object[]{original});
+ } else {
+
final JoltTransform transform = getTransform(context,
original);
final Record transformedFirstRecord = transform(firstRecord,
transform);
+ if (transformedFirstRecord == null) {
+ throw new ProcessException("Error transforming the first
record");
+ }
+
final RecordSchema writeSchema =
writerFactory.getSchema(original.getAttributes(),
transformedFirstRecord.getSchema());
// TODO: Is it possible that two Records with the same input
schema could have different schemas after transformation?
@@ -353,27 +356,34 @@ public class JoltTransformRecord extends
AbstractProcessor {
writeResult = writer.finishRecordSet();
+ try {
+ writer.close();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Writer for {}", new
Object[]{transformed});
+ }
+
attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(),
writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
}
- } catch (Exception e) {
- logger.error("Unable to write transformed records {} due to
{}", new Object[]{original, e.toString(), e});
- session.remove(transformed);
- session.transfer(original, REL_FAILURE);
- return;
- }
- final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
- transformed = session.putAllAttributes(transformed, attributes);
- session.transfer(transformed, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(transformed,
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(original, REL_ORIGINAL);
- logger.debug("Transformed {}", new Object[]{original});
+ final String transformType =
context.getProperty(JOLT_TRANSFORM).getValue();
+ transformed = session.putAllAttributes(transformed,
attributes);
+ session.getProvenanceReporter().modifyContent(transformed,
"Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ logger.debug("Transformed {}", new Object[]{original});
+ }
} catch (final Exception ex) {
logger.error("Unable to transform {} due to {}", new
Object[]{original, ex.toString(), ex});
session.transfer(original, REL_FAILURE);
+ if (transformed != null) {
+ session.remove(transformed);
+ }
+ return;
+ }
+ if (transformed != null) {
+ session.transfer(transformed, REL_SUCCESS);
}
+ session.transfer(original, REL_ORIGINAL);
}
private Record transform(final Record record, final JoltTransform
transform) {
diff --git
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
index e7b43b5..1c5365f 100644
---
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
+++
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java
@@ -165,6 +165,24 @@ public class TestJoltTransformRecord {
}
@Test
+ public void testNoRecords() throws IOException {
+ generateTestData(0, null);
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(writer, "Pretty Print JSON", "true");
+ runner.enableControllerService(writer);
+ final String spec = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
+ runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
+ runner.enqueue("{}");
+ runner.run();
+ runner.assertQueueEmpty();
+ runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0);
+ runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
+ }
+
+ @Test
public void testInvalidFlowFileContent() throws IOException {
generateTestData(1, null);