FileInputFormat cannot know about the reached variable that you added in
your class. So there is no way it could reset it to false.
An alternative implementation without overriding open() could be to change
the reachedEnd method to check if the stream is still at offset 0.

2017-08-01 20:22 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:

> Thanks that worked. However, what I don't understand is wouldn't the open
> call that I am inheriting have this logic already inbuilt? I am inheriting
> FileInputFormat.
>
> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> An InputFormat processes multiple InputSplits. open() is called for each
>> InputSplit.
>> If you don't reset reached to false in open() you will only read a single
>> (i.e., the first) InputSplit and skip all others.
>>
>> I'd override open as follows:
>>
>> public void open(FileInputSplit fileSplit) throws IOException {
>>   super.open();
>>   reached = false;
>> }
>>
>> Cheers, Fabian
>>
>>
>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>
>>> I didn't override open. I am using open that got inherited from
>>> FileInputFormat . Am I supposed to specifically override open?
>>>
>>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Do you set reached to false in open()?
>>>>
>>>>
>>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <
>>>> mohitanch...@gmail.com>:
>>>>
>>>> And here is the inputformat code:
>>>>
>>>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>>>  /**
>>>>   *
>>>>   */
>>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>>  private static final Logger logger = LoggerFactory
>>>>    .getLogger(PDFInputFormat.class.getName());
>>>>  private boolean reached = false;
>>>>  @Override
>>>>  public boolean reachedEnd() throws IOException {
>>>>   logger.info("called reached " + reached);
>>>>   // TODO Auto-generated method stub
>>>>   return reached;
>>>>  }
>>>>  @Override
>>>>  public String nextRecord(String reuse) throws IOException {
>>>>   logger.info("This is where you parse PDF");
>>>>   String content = new String(
>>>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>>> .getPath())));
>>>>   logger.info("Content " + content);
>>>>   reached = true;
>>>>   return content;
>>>>  }
>>>> }
>>>>
>>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a very simple program that just reads all the files in the
>>>>> path. However, flink is not working as expected.
>>>>>
>>>>> Everytime I execute this job I only see flink reading 2 files, even
>>>>> though there are more in that directory. On closer look it appears that it
>>>>> might be related to:
>>>>>
>>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>>>> task slot(s).
>>>>>
>>>>> My question is, isn't flink supposed to iterate over the directory
>>>>> after those 2 slots become free again? I am assuming this problem is 
>>>>> caused
>>>>> because there are only 2 slots.
>>>>>
>>>>>
>>>>> Code ---
>>>>>
>>>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>>>   format.setFilePath(args[0]);
>>>>>   format.setNestedFileEnumeration(true);
>>>>>   logger.info("Number of splits " + format.getNumSplits());
>>>>>
>>>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>>>> ring());
>>>>>
>>>>>   env.createInput(format, TypeInformation.of(StringValue
>>>>> .class)).print();
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to