[hotfix] [avro] Simplify the FSDataInputStreamWrapper The FSDataInputStreamWrapper comes from a time when Flink's FsDataInputStream was not position aware. Not that it is, the FSDataInputStreamWrapper is not required to track its own position, but can simply delegate these calls to the FsDataInputStream.
This also adds missing @Override tags. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25dcdea0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25dcdea0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25dcdea0 Branch: refs/heads/master Commit: 25dcdea056a341142d4b85e942aa80e9f82879ad Parents: 4cd7a80 Author: Stephan Ewen <[email protected]> Authored: Thu Nov 2 19:51:06 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Nov 3 16:40:35 2017 +0100 ---------------------------------------------------------------------- .../avro/utils/FSDataInputStreamWrapper.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/25dcdea0/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java index c00fecb..5d412e2 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java @@ -31,36 +31,36 @@ import java.io.IOException; * <p>The wrapper keeps track of the position in the data stream. */ public class FSDataInputStreamWrapper implements Closeable, SeekableInput { + private final FSDataInputStream stream; - private long pos; - private long len; + private final long len; public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { this.stream = stream; - this.pos = 0; this.len = len; } + @Override public long length() throws IOException { return this.len; } + @Override public int read(byte[] b, int off, int len) throws IOException { - int read; - read = stream.read(b, off, len); - pos += read; - return read; + return stream.read(b, off, len); } + @Override public void seek(long p) throws IOException { stream.seek(p); - pos = p; } + @Override public long tell() throws IOException { - return pos; + return stream.getPos(); } + @Override public void close() throws IOException { stream.close(); }
