Do you set reached to false in open()? Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com>:
And here is the inputformat code: public class PDFFileInputFormat extends FileInputFormat<String> { /** * */ private static final long serialVersionUID = -4137283038479003711L; private static final Logger logger = LoggerFactory .getLogger(PDFInputFormat.class.getName()); private boolean reached = false; @Override public boolean reachedEnd() throws IOException { logger.info("called reached " + reached); // TODO Auto-generated method stub return reached; } @Override public String nextRecord(String reuse) throws IOException { logger.info("This is where you parse PDF"); String content = new String( Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath()))); logger.info("Content " + content); reached = true; return content; } } On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com> wrote: > I have a very simple program that just reads all the files in the path. > However, flink is not working as expected. > > Everytime I execute this job I only see flink reading 2 files, even though > there are more in that directory. On closer look it appears that it might > be related to: > > [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task > slot(s). > > My question is, isn't flink supposed to iterate over the directory after > those 2 slots become free again? I am assuming this problem is caused > because there are only 2 slots. > > > Code --- > > PDFFileInputFormat format = new PDFFileInputFormat(); > format.setFilePath(args[0]); > format.setNestedFileEnumeration(true); > logger.info("Number of splits " + format.getNumSplits()); > > // logger.info(Paths.get(".").toAbsolutePath().normalize().toString()); > > env.createInput(format, TypeInformation.of(StringValue.class)).print(); >