Do you set reached to false in open()?

Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com>:

And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com>
wrote:

> I have a very simple program that just reads all the files in the path.
> However, flink is not working as expected.
>
> Everytime I execute this job I only see flink reading 2 files, even though
> there are more in that directory. On closer look it appears that it might
> be related to:
>
> [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task
> slot(s).
>
> My question is, isn't flink supposed to iterate over the directory after
> those 2 slots become free again? I am assuming this problem is caused
> because there are only 2 slots.
>
>
> Code ---
>
>   PDFFileInputFormat format = new PDFFileInputFormat();
>   format.setFilePath(args[0]);
>   format.setNestedFileEnumeration(true);
>   logger.info("Number of splits " + format.getNumSplits());
>
>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());
>
>   env.createInput(format, TypeInformation.of(StringValue.class)).print();
>

Reply via email to