Repository: nifi Updated Branches: refs/heads/0.x 2276c6ac1 -> 4f72e3491
NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (0.x) * Updated StandardRecordWriter to consider the encoding behavior of * java.io.DataOutputStream.writeUTF() and truncate string values such that * the UTF representation will not be longer than that DataOutputStream's * 64K byte UTF format limit. * Add test to confirm handling of large UTF strings. Signed-off-by: Mike Moser <mose...@apache.org> This closes #1470. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4f72e349 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4f72e349 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4f72e349 Branch: refs/heads/0.x Commit: 4f72e3491f2372c8c45afb96a765c1f5cdd2f07d Parents: 2276c6a Author: Joe Skora <jsk...@apache.org> Authored: Thu Feb 2 19:11:05 2017 +0000 Committer: Mike Moser <mose...@apache.org> Committed: Fri Feb 3 17:10:30 2017 -0500 ---------------------------------------------------------------------- .../nifi/provenance/StandardRecordWriter.java | 62 ++++++++++++++++---- .../TestStandardRecordReaderWriter.java | 44 ++++++++++++++ 2 files changed, 96 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4f72e349/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index a5c121a..f015cc8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UTFDataFormatException; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardRecordWriter implements RecordWriter { + + public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); private final File file; @@ -83,7 +87,7 @@ public class StandardRecordWriter implements RecordWriter { lastBlockOffset = rawOutStream.getBytesWritten(); resetWriteStream(firstEventId); - out.writeUTF(PersistentProvenanceRepository.class.getName()); + writeUTFLimited(out, PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } catch (final IOException ioe) { @@ -161,7 +165,7 @@ public class StandardRecordWriter implements RecordWriter { } out.writeLong(recordIdentifier); - out.writeUTF(record.getEventType().name()); + writeUTFLimited(out, record.getEventType().name()); out.writeLong(record.getEventTime()); out.writeLong(record.getFlowFileEntryDate()); out.writeLong(record.getEventDuration()); @@ -192,9 +196,9 @@ public class StandardRecordWriter implements RecordWriter { // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getContentClaimContainer()); - out.writeUTF(record.getContentClaimSection()); - out.writeUTF(record.getContentClaimIdentifier()); + writeUTFLimited(out, record.getContentClaimContainer()); + writeUTFLimited(out, record.getContentClaimSection()); + writeUTFLimited(out, record.getContentClaimIdentifier()); if (record.getContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -208,9 +212,9 @@ public class StandardRecordWriter implements RecordWriter { // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getPreviousContentClaimContainer()); - out.writeUTF(record.getPreviousContentClaimSection()); - out.writeUTF(record.getPreviousContentClaimIdentifier()); + writeUTFLimited(out, record.getPreviousContentClaimContainer()); + writeUTFLimited(out, record.getPreviousContentClaimSection()); + writeUTFLimited(out, record.getPreviousContentClaimIdentifier()); if (record.getPreviousContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -256,7 +260,7 @@ public class StandardRecordWriter implements RecordWriter { } protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - out.writeUTF(uuid); + writeUTFLimited(out, uuid); } protected void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException { @@ -275,7 +279,7 @@ public class StandardRecordWriter implements RecordWriter { out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeUTF(toWrite); + writeUTFLimited(out, toWrite); } } @@ -400,4 +404,42 @@ public class StandardRecordWriter implements RecordWriter { public boolean isDirty() { return dirtyFlag.get(); } + + private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException { + try { + out.writeUTF(utfString); + } catch (UTFDataFormatException e) { + final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); + logger.warn("Truncating provenance record value! Attempted to write {} chars that encode to a UTF byte length greater than " + + "supported maximum ({}), truncating to {} chars.", + utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); + if (logger.isDebugEnabled()) { + logger.warn("String value was:\n{}", truncated); + } + out.writeUTF(truncated); + } + } + + static int getCharsInUTFLength(final String str, final int utfLimit) { + // see java.io.DataOutputStream.writeUTF() + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to Char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) & (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + if (utflen > utfLimit) { + return i; + } + } + return strlen; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4f72e349/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index e11502a..60a2518 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -20,7 +20,10 @@ import static org.apache.nifi.provenance.TestUtil.createFlowFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -28,6 +31,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.provenance.toc.StandardTocReader; import org.apache.nifi.provenance.toc.StandardTocWriter; import org.apache.nifi.provenance.toc.TocReader; @@ -186,4 +190,44 @@ public class TestStandardRecordReaderWriter { FileUtils.deleteFile(journalFile.getParentFile(), true); } + + @Test + public void testWriteUtfLargerThan64k() throws IOException, InterruptedException { + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final String seventyK = StringUtils.repeat("X", 70000); + assertTrue(seventyK.length() > 65535); + assertTrue(seventyK.getBytes("UTF-8").length > 65535); + builder.setDetails(seventyK); + final ProvenanceEventRecord record = builder.build(); + + try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(headerOut)) { + out.writeUTF(PersistentProvenanceRepository.class.getName()); + out.writeInt(9); + } + + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream(); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 0)) { + + writer.writeHeader(1L); + recordOut.reset(); + + writer.writeRecord(record, 1L); + } + } + }