[jira] [Commented] (FLINK-2503) Inconsistencies in FileInputFormat hierarchy
[ https://issues.apache.org/jira/browse/FLINK-2503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329855#comment-17329855 ] Flink Jira Bot commented on FLINK-2503: --- This issue has been labeled "stale-minor" for 7 days. It is closed now. If you are still affected by this or would like to raise the priority of this ticket please re-open, removing the label "auto-closed" and raise the ticket priority accordingly. > Inconsistencies in FileInputFormat hierarchy > > > Key: FLINK-2503 > URL: https://issues.apache.org/jira/browse/FLINK-2503 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 0.10.0 >Reporter: Flavio Pompermaier >Priority: Minor > Labels: stale-minor > > From a thread in the user mailing list (Invalid argument reading a file > containing a Kryo object). > I think that there are some inconsistencies in the hierarchy of InputFormats. > The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the > behaviour of the FileInputFormat (so respect unsplittable and > enumerateNestedFiles) while they doesn't take into account those flags. > Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" > that maybe should be removed or fixed :) > Also maintaing aligned testForUnsplittable and decorateInputStream is somehow > dangerous.. > And maybe visibility for getBlockIndexForPosition should be changed to > protected? > My need was to implement a TypeSerializerInputFormat but to > achieve that I had to make a lot of overrides..am I doing something wrong or > are those inputFormat somehow to improve..? This is my IF code (remark: from > the comment "Copied from FileInputFormat (override > TypeSerializerInputFormat)" on the code is copied-and-pasted from > FileInputFormat..thus MY code ends there): > {code:java} > public class RowBundleInputFormat extends > TypeSerializerInputFormat { > private static final long serialVersionUID = 1L; > private static final Logger LOG = > LoggerFactory.getLogger(RowBundleInputFormat.class); > /** The fraction that the last split may be larger than the others. */ > private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; > private boolean objectRead; > public RowBundleInputFormat() { > super(new GenericTypeInfo<>(RowBundle.class)); > unsplittable = true; > } > @Override > protected FSDataInputStream decorateInputStream(FSDataInputStream > inputStream, FileInputSplit fileSplit) throws Throwable { > return inputStream; > } > @Override > protected boolean testForUnsplittable(FileStatus pathFile) { > return true; > } > @Override > public void open(FileInputSplit split) throws IOException { > super.open(split); > objectRead = false; > } > @Override > public boolean reachedEnd() throws IOException { > return this.objectRead; > } > @Override > public RowBundle nextRecord(RowBundle reuse) throws IOException { > RowBundle yourObject = super.nextRecord(reuse); > this.objectRead = true; // read only one object > return yourObject; > } > // --- > // Copied from FileInputFormat (overriding TypeSerializerInputFormat) > // --- > @Override > public FileInputSplit[] createInputSplits(int minNumSplits) > throws IOException {...} > private long addNestedFiles(Path path, List files, long > length, boolean logExcludedFiles) throws IOException {...} > private int getBlockIndexForPosition(BlockLocation[] blocks, long > offset, long halfSplitSize, int startIndex) { ... } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-2503) Inconsistencies in FileInputFormat hierarchy
[ https://issues.apache.org/jira/browse/FLINK-2503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321757#comment-17321757 ] Flink Jira Bot commented on FLINK-2503: --- This issue and all of its Sub-Tasks have not been updated for 180 days. So, it has been labeled "stale-minor". If you are still affected by this bug or are still interested in this issue, please give an update and remove the label. In 7 days the issue will be closed automatically. > Inconsistencies in FileInputFormat hierarchy > > > Key: FLINK-2503 > URL: https://issues.apache.org/jira/browse/FLINK-2503 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 0.10.0 >Reporter: Flavio Pompermaier >Priority: Minor > Labels: stale-minor > > From a thread in the user mailing list (Invalid argument reading a file > containing a Kryo object). > I think that there are some inconsistencies in the hierarchy of InputFormats. > The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the > behaviour of the FileInputFormat (so respect unsplittable and > enumerateNestedFiles) while they doesn't take into account those flags. > Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" > that maybe should be removed or fixed :) > Also maintaing aligned testForUnsplittable and decorateInputStream is somehow > dangerous.. > And maybe visibility for getBlockIndexForPosition should be changed to > protected? > My need was to implement a TypeSerializerInputFormat but to > achieve that I had to make a lot of overrides..am I doing something wrong or > are those inputFormat somehow to improve..? This is my IF code (remark: from > the comment "Copied from FileInputFormat (override > TypeSerializerInputFormat)" on the code is copied-and-pasted from > FileInputFormat..thus MY code ends there): > {code:java} > public class RowBundleInputFormat extends > TypeSerializerInputFormat { > private static final long serialVersionUID = 1L; > private static final Logger LOG = > LoggerFactory.getLogger(RowBundleInputFormat.class); > /** The fraction that the last split may be larger than the others. */ > private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; > private boolean objectRead; > public RowBundleInputFormat() { > super(new GenericTypeInfo<>(RowBundle.class)); > unsplittable = true; > } > @Override > protected FSDataInputStream decorateInputStream(FSDataInputStream > inputStream, FileInputSplit fileSplit) throws Throwable { > return inputStream; > } > @Override > protected boolean testForUnsplittable(FileStatus pathFile) { > return true; > } > @Override > public void open(FileInputSplit split) throws IOException { > super.open(split); > objectRead = false; > } > @Override > public boolean reachedEnd() throws IOException { > return this.objectRead; > } > @Override > public RowBundle nextRecord(RowBundle reuse) throws IOException { > RowBundle yourObject = super.nextRecord(reuse); > this.objectRead = true; // read only one object > return yourObject; > } > // --- > // Copied from FileInputFormat (overriding TypeSerializerInputFormat) > // --- > @Override > public FileInputSplit[] createInputSplits(int minNumSplits) > throws IOException {...} > private long addNestedFiles(Path path, List files, long > length, boolean logExcludedFiles) throws IOException {...} > private int getBlockIndexForPosition(BlockLocation[] blocks, long > offset, long halfSplitSize, int startIndex) { ... } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)