[ 
https://issues.apache.org/jira/browse/FLINK-2503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321757#comment-17321757
 ] 

Flink Jira Bot commented on FLINK-2503:
---------------------------------------

This issue and all of its Sub-Tasks have not been updated for 180 days. So, it 
has been labeled "stale-minor". If you are still affected by this bug or are 
still interested in this issue, please give an update and remove the label. In 
7 days the issue will be closed automatically.

> Inconsistencies in FileInputFormat hierarchy
> --------------------------------------------
>
>                 Key: FLINK-2503
>                 URL: https://issues.apache.org/jira/browse/FLINK-2503
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 0.10.0
>            Reporter: Flavio Pompermaier
>            Priority: Minor
>              Labels: stale-minor
>
> From a thread in the user mailing list (Invalid argument reading a file 
> containing a Kryo object).
> I think that there are some inconsistencies in the hierarchy of InputFormats.
> The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the 
> behaviour of the FileInputFormat (so respect unsplittable and 
> enumerateNestedFiles) while they doesn't take into account those flags.
> Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" 
> that maybe should be removed or fixed :)
> Also maintaing aligned testForUnsplittable and decorateInputStream is somehow 
> dangerous..
> And maybe visibility for getBlockIndexForPosition should be changed to 
> protected?
> My need was to implement a TypeSerializerInputFormat<RowBundle> but to 
> achieve that I had to make a lot of overrides..am I doing something wrong or 
> are those inputFormat somehow to improve..? This is my IF code (remark: from 
> the comment "Copied from FileInputFormat (override 
> TypeSerializerInputFormat)" on the code is copied-and-pasted from 
> FileInputFormat..thus MY code ends there):
> {code:java}
> public class RowBundleInputFormat extends 
> TypeSerializerInputFormat<RowBundle> {
>       private static final long serialVersionUID = 1L;
>       private static final Logger LOG = 
> LoggerFactory.getLogger(RowBundleInputFormat.class);
>       /** The fraction that the last split may be larger than the others. */
>       private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
>       private boolean objectRead;
>       public RowBundleInputFormat() {
>               super(new GenericTypeInfo<>(RowBundle.class));
>               unsplittable = true;
>       }
>       @Override
>       protected FSDataInputStream decorateInputStream(FSDataInputStream 
> inputStream, FileInputSplit fileSplit) throws Throwable {
>               return inputStream;
>       }
>       @Override
>       protected boolean testForUnsplittable(FileStatus pathFile) {
>               return true;
>       }
>       @Override
>       public void open(FileInputSplit split) throws IOException {
>               super.open(split);
>               objectRead = false;
>       }
>       @Override
>       public boolean reachedEnd() throws IOException {
>               return this.objectRead;
>       }
>       @Override
>       public RowBundle nextRecord(RowBundle reuse) throws IOException {
>               RowBundle yourObject = super.nextRecord(reuse);
>               this.objectRead = true; // read only one object
>               return yourObject;
>       }
>       // -------------------------------------------------------------------
>       // Copied from FileInputFormat (overriding TypeSerializerInputFormat)
>       // -------------------------------------------------------------------
>       @Override
>       public FileInputSplit[] createInputSplits(int minNumSplits)
>                       throws IOException {...}
>       private long addNestedFiles(Path path, List<FileStatus> files, long 
> length, boolean logExcludedFiles) throws IOException {...}
>       private int getBlockIndexForPosition(BlockLocation[] blocks, long 
> offset, long halfSplitSize, int startIndex) { ... }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to