Re: Odd flink behaviour
Thanks. I thought the purpose of below method was to supply that information? @Override *public* *boolean* reachedEnd() *throws* IOException { *logger*.info("Reached " + reached); *return* reached; } On Wed, Aug 2, 2017 at 1:43 AM, Fabian Hueskewrote: > FileInputFormat cannot know about the reached variable that you added in > your class. So there is no way it could reset it to false. > An alternative implementation without overriding open() could be to change > the reachedEnd method to check if the stream is still at offset 0. > > 2017-08-01 20:22 GMT+02:00 Mohit Anchlia : > >> Thanks that worked. However, what I don't understand is wouldn't the open >> call that I am inheriting have this logic already inbuilt? I am inheriting >> FileInputFormat. >> >> On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske wrote: >> >>> An InputFormat processes multiple InputSplits. open() is called for each >>> InputSplit. >>> If you don't reset reached to false in open() you will only read a >>> single (i.e., the first) InputSplit and skip all others. >>> >>> I'd override open as follows: >>> >>> public void open(FileInputSplit fileSplit) throws IOException { >>> super.open(); >>> reached = false; >>> } >>> >>> Cheers, Fabian >>> >>> >>> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia : >>> I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open? On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske wrote: > Do you set reached to false in open()? > > > Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" < > mohitanch...@gmail.com>: > > And here is the inputformat code: > > public class PDFFileInputFormat extends FileInputFormat { > /** > * > */ > private static final long serialVersionUID = -4137283038479003711L; > private static final Logger logger = LoggerFactory >.getLogger(PDFInputFormat.class.getName()); > private boolean reached = false; > @Override > public boolean reachedEnd() throws IOException { > logger.info("called reached " + reached); > // TODO Auto-generated method stub > return reached; > } > @Override > public String nextRecord(String reuse) throws IOException { > logger.info("This is where you parse PDF"); > String content = new String( > Files.readAllBytes(Paths.get(this.currentSplit.getPath() > .getPath(; > logger.info("Content " + content); > reached = true; > return content; > } > } > > On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia > wrote: > >> I have a very simple program that just reads all the files in the >> path. However, flink is not working as expected. >> >> Everytime I execute this job I only see flink reading 2 files, even >> though there are more in that directory. On closer look it appears that >> it >> might be related to: >> >> [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 >> task slot(s). >> >> My question is, isn't flink supposed to iterate over the directory >> after those 2 slots become free again? I am assuming this problem is >> caused >> because there are only 2 slots. >> >> >> Code --- >> >> PDFFileInputFormat format = new PDFFileInputFormat(); >> format.setFilePath(args[0]); >> format.setNestedFileEnumeration(true); >> logger.info("Number of splits " + format.getNumSplits()); >> >> // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt >> ring()); >> >> env.createInput(format, TypeInformation.of(StringValue >> .class)).print(); >> > > > >>> >> >
Re: Odd flink behaviour
FileInputFormat cannot know about the reached variable that you added in your class. So there is no way it could reset it to false. An alternative implementation without overriding open() could be to change the reachedEnd method to check if the stream is still at offset 0. 2017-08-01 20:22 GMT+02:00 Mohit Anchlia: > Thanks that worked. However, what I don't understand is wouldn't the open > call that I am inheriting have this logic already inbuilt? I am inheriting > FileInputFormat. > > On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske wrote: > >> An InputFormat processes multiple InputSplits. open() is called for each >> InputSplit. >> If you don't reset reached to false in open() you will only read a single >> (i.e., the first) InputSplit and skip all others. >> >> I'd override open as follows: >> >> public void open(FileInputSplit fileSplit) throws IOException { >> super.open(); >> reached = false; >> } >> >> Cheers, Fabian >> >> >> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia : >> >>> I didn't override open. I am using open that got inherited from >>> FileInputFormat . Am I supposed to specifically override open? >>> >>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske >>> wrote: >>> Do you set reached to false in open()? Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" < mohitanch...@gmail.com>: And here is the inputformat code: public class PDFFileInputFormat extends FileInputFormat { /** * */ private static final long serialVersionUID = -4137283038479003711L; private static final Logger logger = LoggerFactory .getLogger(PDFInputFormat.class.getName()); private boolean reached = false; @Override public boolean reachedEnd() throws IOException { logger.info("called reached " + reached); // TODO Auto-generated method stub return reached; } @Override public String nextRecord(String reuse) throws IOException { logger.info("This is where you parse PDF"); String content = new String( Files.readAllBytes(Paths.get(this.currentSplit.getPath() .getPath(; logger.info("Content " + content); reached = true; return content; } } On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia wrote: > I have a very simple program that just reads all the files in the > path. However, flink is not working as expected. > > Everytime I execute this job I only see flink reading 2 files, even > though there are more in that directory. On closer look it appears that it > might be related to: > > [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 > task slot(s). > > My question is, isn't flink supposed to iterate over the directory > after those 2 slots become free again? I am assuming this problem is > caused > because there are only 2 slots. > > > Code --- > > PDFFileInputFormat format = new PDFFileInputFormat(); > format.setFilePath(args[0]); > format.setNestedFileEnumeration(true); > logger.info("Number of splits " + format.getNumSplits()); > > // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt > ring()); > > env.createInput(format, TypeInformation.of(StringValue > .class)).print(); > >>> >> >
Re: Odd flink behaviour
Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat. On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueskewrote: > An InputFormat processes multiple InputSplits. open() is called for each > InputSplit. > If you don't reset reached to false in open() you will only read a single > (i.e., the first) InputSplit and skip all others. > > I'd override open as follows: > > public void open(FileInputSplit fileSplit) throws IOException { > super.open(); > reached = false; > } > > Cheers, Fabian > > > 2017-08-01 8:08 GMT+02:00 Mohit Anchlia : > >> I didn't override open. I am using open that got inherited from >> FileInputFormat . Am I supposed to specifically override open? >> >> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske wrote: >> >>> Do you set reached to false in open()? >>> >>> >>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" >> >: >>> >>> And here is the inputformat code: >>> >>> public class PDFFileInputFormat extends FileInputFormat { >>> /** >>> * >>> */ >>> private static final long serialVersionUID = -4137283038479003711L; >>> private static final Logger logger = LoggerFactory >>>.getLogger(PDFInputFormat.class.getName()); >>> private boolean reached = false; >>> @Override >>> public boolean reachedEnd() throws IOException { >>> logger.info("called reached " + reached); >>> // TODO Auto-generated method stub >>> return reached; >>> } >>> @Override >>> public String nextRecord(String reuse) throws IOException { >>> logger.info("This is where you parse PDF"); >>> String content = new String( >>> Files.readAllBytes(Paths.get(this.currentSplit.getPath() >>> .getPath(; >>> logger.info("Content " + content); >>> reached = true; >>> return content; >>> } >>> } >>> >>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia >>> wrote: >>> I have a very simple program that just reads all the files in the path. However, flink is not working as expected. Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to: [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s). My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots. Code --- PDFFileInputFormat format = new PDFFileInputFormat(); format.setFilePath(args[0]); format.setNestedFileEnumeration(true); logger.info("Number of splits " + format.getNumSplits()); // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt ring()); env.createInput(format, TypeInformation.of(StringValue .class)).print(); >>> >>> >>> >> >
Re: Odd flink behaviour
An InputFormat processes multiple InputSplits. open() is called for each InputSplit. If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others. I'd override open as follows: public void open(FileInputSplit fileSplit) throws IOException { super.open(); reached = false; } Cheers, Fabian 2017-08-01 8:08 GMT+02:00 Mohit Anchlia: > I didn't override open. I am using open that got inherited from > FileInputFormat . Am I supposed to specifically override open? > > On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske wrote: > >> Do you set reached to false in open()? >> >> >> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" > >: >> >> And here is the inputformat code: >> >> public class PDFFileInputFormat extends FileInputFormat { >> /** >> * >> */ >> private static final long serialVersionUID = -4137283038479003711L; >> private static final Logger logger = LoggerFactory >>.getLogger(PDFInputFormat.class.getName()); >> private boolean reached = false; >> @Override >> public boolean reachedEnd() throws IOException { >> logger.info("called reached " + reached); >> // TODO Auto-generated method stub >> return reached; >> } >> @Override >> public String nextRecord(String reuse) throws IOException { >> logger.info("This is where you parse PDF"); >> String content = new String( >> Files.readAllBytes(Paths.get(this.currentSplit.getPath() >> .getPath(; >> logger.info("Content " + content); >> reached = true; >> return content; >> } >> } >> >> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia >> wrote: >> >>> I have a very simple program that just reads all the files in the path. >>> However, flink is not working as expected. >>> >>> Everytime I execute this job I only see flink reading 2 files, even >>> though there are more in that directory. On closer look it appears that it >>> might be related to: >>> >>> [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 >>> task slot(s). >>> >>> My question is, isn't flink supposed to iterate over the directory after >>> those 2 slots become free again? I am assuming this problem is caused >>> because there are only 2 slots. >>> >>> >>> Code --- >>> >>> PDFFileInputFormat format = new PDFFileInputFormat(); >>> format.setFilePath(args[0]); >>> format.setNestedFileEnumeration(true); >>> logger.info("Number of splits " + format.getNumSplits()); >>> >>> // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt >>> ring()); >>> >>> env.createInput(format, TypeInformation.of(StringValue >>> .class)).print(); >>> >> >> >> >
Re: Odd flink behaviour
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open? On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueskewrote: > Do you set reached to false in open()? > > > Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" : > > And here is the inputformat code: > > public class PDFFileInputFormat extends FileInputFormat { > /** > * > */ > private static final long serialVersionUID = -4137283038479003711L; > private static final Logger logger = LoggerFactory >.getLogger(PDFInputFormat.class.getName()); > private boolean reached = false; > @Override > public boolean reachedEnd() throws IOException { > logger.info("called reached " + reached); > // TODO Auto-generated method stub > return reached; > } > @Override > public String nextRecord(String reuse) throws IOException { > logger.info("This is where you parse PDF"); > String content = new String( > Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath(; > logger.info("Content " + content); > reached = true; > return content; > } > } > > On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia > wrote: > >> I have a very simple program that just reads all the files in the path. >> However, flink is not working as expected. >> >> Everytime I execute this job I only see flink reading 2 files, even >> though there are more in that directory. On closer look it appears that it >> might be related to: >> >> [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 >> task slot(s). >> >> My question is, isn't flink supposed to iterate over the directory after >> those 2 slots become free again? I am assuming this problem is caused >> because there are only 2 slots. >> >> >> Code --- >> >> PDFFileInputFormat format = new PDFFileInputFormat(); >> format.setFilePath(args[0]); >> format.setNestedFileEnumeration(true); >> logger.info("Number of splits " + format.getNumSplits()); >> >> // logger.info(Paths.get(".").toAbsolutePath().normalize().toString()); >> >> env.createInput(format, TypeInformation.of(StringValue.class)).print(); >> > > >
Re: Odd flink behaviour
Do you set reached to false in open()? Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia": And here is the inputformat code: public class PDFFileInputFormat extends FileInputFormat { /** * */ private static final long serialVersionUID = -4137283038479003711L; private static final Logger logger = LoggerFactory .getLogger(PDFInputFormat.class.getName()); private boolean reached = false; @Override public boolean reachedEnd() throws IOException { logger.info("called reached " + reached); // TODO Auto-generated method stub return reached; } @Override public String nextRecord(String reuse) throws IOException { logger.info("This is where you parse PDF"); String content = new String( Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath(; logger.info("Content " + content); reached = true; return content; } } On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia wrote: > I have a very simple program that just reads all the files in the path. > However, flink is not working as expected. > > Everytime I execute this job I only see flink reading 2 files, even though > there are more in that directory. On closer look it appears that it might > be related to: > > [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task > slot(s). > > My question is, isn't flink supposed to iterate over the directory after > those 2 slots become free again? I am assuming this problem is caused > because there are only 2 slots. > > > Code --- > > PDFFileInputFormat format = new PDFFileInputFormat(); > format.setFilePath(args[0]); > format.setNestedFileEnumeration(true); > logger.info("Number of splits " + format.getNumSplits()); > > // logger.info(Paths.get(".").toAbsolutePath().normalize().toString()); > > env.createInput(format, TypeInformation.of(StringValue.class)).print(); >
Re: Odd flink behaviour
And here is the inputformat code: public class PDFFileInputFormat extends FileInputFormat { /** * */ private static final long serialVersionUID = -4137283038479003711L; private static final Logger logger = LoggerFactory .getLogger(PDFInputFormat.class.getName()); private boolean reached = false; @Override public boolean reachedEnd() throws IOException { logger.info("called reached " + reached); // TODO Auto-generated method stub return reached; } @Override public String nextRecord(String reuse) throws IOException { logger.info("This is where you parse PDF"); String content = new String( Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath(; logger.info("Content " + content); reached = true; return content; } } On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchliawrote: > I have a very simple program that just reads all the files in the path. > However, flink is not working as expected. > > Everytime I execute this job I only see flink reading 2 files, even though > there are more in that directory. On closer look it appears that it might > be related to: > > [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.taskmanager.TaskManager > - TaskManager has 2 task slot(s). > > My question is, isn't flink supposed to iterate over the directory after > those 2 slots become free again? I am assuming this problem is caused > because there are only 2 slots. > > > Code --- > > PDFFileInputFormat format = new PDFFileInputFormat(); > format.setFilePath(args[0]); > format.setNestedFileEnumeration(true); > logger.info("Number of splits " + format.getNumSplits()); > > // logger.info(Paths.get(".").toAbsolutePath().normalize().toString()); > > env.createInput(format, TypeInformation.of(StringValue.class)).print(); >