This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 5da77e8e74 NIFI-11636: Do not buffer Parquet content into memory unnecessarily 5da77e8e74 is described below commit 5da77e8e74343f04ebe7da34ba47cbc3cbe0d4fc Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Jun 2 13:21:15 2023 -0400 NIFI-11636: Do not buffer Parquet content into memory unnecessarily NIFI-11636: Change default log level from parquet internal reader to WARN as it logs excessively at INFO level Signed-off-by: Matt Burgess <mattyb...@apache.org> --- .../nifi-resources/src/main/resources/conf/logback.xml | 2 +- .../java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java | 5 ++--- .../org/apache/nifi/parquet/stream/NifiSeekableInputStream.java | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index 4a1e57e1f5..ae2733f880 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -119,7 +119,7 @@ <logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/> <logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/> <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" /> - + <logger name="org.apache.parquet.hadoop.InternalParquetRecordReader" level="WARN" /> <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" /> <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" /> diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java index c4ac722c7b..98e8cf3f75 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java @@ -20,7 +20,6 @@ import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; -import java.io.IOException; import java.io.InputStream; public class NifiParquetInputFile implements InputFile { @@ -42,12 +41,12 @@ public class NifiParquetInputFile implements InputFile { } @Override - public long getLength() throws IOException { + public long getLength() { return length; } @Override - public SeekableInputStream newStream() throws IOException { + public SeekableInputStream newStream() { return new NifiSeekableInputStream(input); } } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java index cd6b820536..89d2cc0c32 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java @@ -29,11 +29,11 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream { public NifiSeekableInputStream(final ByteCountingInputStream input) { super(input); this.input = input; - this.input.mark(Integer.MAX_VALUE); + this.input.mark(8192); } @Override - public long getPos() throws IOException { + public long getPos() { return input.getBytesConsumed(); } @@ -47,7 +47,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream { if (newPos < currentPos) { // seeking backwards so first reset back to beginning of the stream then seek input.reset(); - input.mark(Integer.MAX_VALUE); + input.mark(8192); } // must call getPos() again in case reset was called above @@ -65,7 +65,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream { } @Override - public synchronized void reset() throws IOException { + public synchronized void reset() { throw new UnsupportedOperationException("Mark/reset is not supported"); } }