Repository: nifi Updated Branches: refs/heads/master 58ce52d5d -> c49933f03
NIFI-3948: This closes #1834. Added flush() method to RecordWriter and call it when writing a single record to OutputStream for PublishKafkaRecord. Also removed no-longer-used class WriteAvroResult Signed-off-by: joewitt <joew...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c49933f0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c49933f0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c49933f0 Branch: refs/heads/master Commit: c49933f03df295760e1b6764e38dddbc9b2d31e6 Parents: 58ce52d Author: Mark Payne <marka...@hotmail.com> Authored: Fri May 19 20:52:26 2017 -0400 Committer: joewitt <joew...@apache.org> Committed: Fri May 19 23:05:04 2017 -0400 ---------------------------------------------------------------------- .../serialization/AbstractRecordSetWriter.java | 5 ++ .../apache/nifi/serialization/RecordWriter.java | 7 +++ .../serialization/record/MockRecordWriter.java | 5 ++ .../processors/kafka/pubsub/PublisherLease.java | 1 + .../kafka/pubsub/util/MockRecordWriter.java | 6 ++ .../groovy/test_record_writer_inline.groovy | 4 ++ .../processors/standard/TestQueryRecord.java | 6 ++ .../org/apache/nifi/avro/WriteAvroResult.java | 63 -------------------- .../avro/WriteAvroResultWithExternalSchema.java | 9 ++- .../nifi/avro/WriteAvroResultWithSchema.java | 5 ++ .../org/apache/nifi/csv/WriteCSVResult.java | 5 ++ .../org/apache/nifi/json/WriteJsonResult.java | 7 +++ .../apache/nifi/text/FreeFormTextWriter.java | 7 +-- 13 files changed, 61 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 6bf574f..6ce9138 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -40,6 +40,11 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { } @Override + public void flush() throws IOException { + out.flush(); + } + + @Override public WriteResult write(final RecordSet recordSet) throws IOException { beginRecordSet(); Record record; http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java index 720953c..6c21a39 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -37,4 +37,11 @@ public interface RecordWriter extends Closeable { * the mime.type attribute. */ String getMimeType(); + + /** + * Flushes any buffered data to the underlying storage mechanism + * + * @throws IOException if unable to write to the underlying storage mechanism + */ + void flush() throws IOException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index 1d6aafe..891bbe3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -66,6 +66,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor private boolean headerWritten = false; @Override + public void flush() throws IOException { + out.flush(); + } + + @Override public WriteResult write(final RecordSet rs) throws IOException { if (header != null && !headerWritten) { out.write(header.getBytes()); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 4b3a3ae..66641df 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -113,6 +113,7 @@ public class PublisherLease implements Closeable { recordCount++; baos.reset(); writer.write(record); + writer.flush(); final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 27df57b..1549626 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -60,6 +60,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { + + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet rs) throws IOException { out.write(header.getBytes()); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index b0daaca..c961171 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -93,6 +93,10 @@ class GroovyRecordSetWriter implements RecordSetWriter { @Override public void close() throws IOException { } + + @Override + public void flush() throws IOException { + } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index c00eb4b..c6035f2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -271,6 +271,12 @@ public class TestQueryRecord { @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { + + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet rs) throws IOException { final int colCount = rs.getSchema().getFieldCount(); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java deleted file mode 100644 index 799d3ee..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.avro; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.Record; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collections; - -public abstract class WriteAvroResult implements RecordSetWriter { - private final Schema schema; - private final OutputStream out; - - public WriteAvroResult(final Schema schema, final OutputStream out) { - this.schema = schema; - this.out = out; - } - - protected Schema getSchema() { - return schema; - } - - @Override - public WriteResult write(final Record record) throws IOException { - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { - dataFileWriter.create(schema, out); - dataFileWriter.append(rec); - } - - return WriteResult.of(1, Collections.emptyMap()); - } - - @Override - public String getMimeType() { - return "application/avro-binary"; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index c1f000b..25d494e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -61,8 +61,7 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { @Override protected Map<String, String> onFinishRecordSet() throws IOException { - encoder.flush(); - buffered.flush(); + flush(); return schemaAccessWriter.getAttributes(recordSchema); } @@ -74,6 +73,12 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { } @Override + public void flush() throws IOException { + encoder.flush(); + buffered.flush(); + } + + @Override public String getMimeType() { return "application/avro-binary"; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index dd15118..ae2f109 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -49,6 +49,11 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { } @Override + public void flush() throws IOException { + dataFileWriter.flush(); + } + + @Override public Map<String, String> writeRecord(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); dataFileWriter.append(rec); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index f8998f9..34a51ba 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -89,6 +89,11 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet } @Override + public void flush() throws IOException { + printer.flush(); + } + + @Override public Map<String, String> writeRecord(final Record record) throws IOException { int i = 0; for (final RecordField recordField : recordSchema.getFields()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index a41412f..8acaa04 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -96,6 +96,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe } @Override + public void flush() throws IOException { + if (generator != null) { + generator.flush(); + } + } + + @Override public Map<String, String> writeRecord(final Record record) throws IOException { writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); return schemaAccess.getAttributes(recordSchema); http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java index 7012504..f22f592 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java @@ -17,6 +17,7 @@ package org.apache.nifi.text; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -37,13 +38,11 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor private static final byte NEW_LINE = (byte) '\n'; private final PropertyValue propertyValue; private final Charset charset; - private final OutputStream out; public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) { - super(out); + super(new BufferedOutputStream(out)); this.propertyValue = textPropertyValue; this.charset = characterSet; - this.out = out; } private List<String> getColumnNames(final RecordSchema schema) { @@ -60,7 +59,7 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor @Override public Map<String, String> writeRecord(final Record record) throws IOException { - write(record, out, getColumnNames(record.getSchema())); + write(record, getOutputStream(), getColumnNames(record.getSchema())); return Collections.emptyMap(); }