Hi, I created a custom input format. Idea behind this is to read all binary files from a directory and use each file as it's own split. Each split is read as one whole record. When I run it in flink I don't get any error but I am not seeing any output from .print. Am I missing something?
---- *public* *class* *PDFFileInputFormat* *extends* RichInputFormat<StringValue, InputSplit> { *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger* (PDFFileInputFormat.*class*.getName()); PDFFileInputSplit current = *null*; *public* *static* *void* main(String... args) *throws* Exception { PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test"); InputSplit[] splits = pdfReader.createInputSplits(1); pdfReader.open(splits[0]); pdfReader.nextRecord(*null*); *final* ExecutionEnvironment env = ExecutionEnvironment. *getExecutionEnvironment*(); env.fromElements(1, 2, 3) // returns the squared i .print(); PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); InputFormatSourceFunction<StringValue> *reader* = *new* InputFormatSourceFunction<>(format, TypeInformation.*of*(StringValue.*class*)); env.createInput(format,TypeInformation.*of*(StringValue.*class*)).print(); } String path = *null*; *public* PDFFileInputFormat(String path) { *this*.path = path; } *public* *void* configure(Configuration parameters) { // *TODO* Auto-generated method stub } *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) *throws* IOException { // *TODO* Auto-generated method stub *return* cachedStatistics; } *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* IOException { *final* List<PDFFileInputSplit> splits = *new* ArrayList<PDFFileInputSplit>(); Files.*list*(Paths.*get*(path)).forEach(f -> { PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); splits.add(split); }); PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits .size()]; *return* splits.toArray(inputSplitArray); } *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { *logger*.info("Assigner"); // *TODO* Auto-generated method stub *return* *new* DefaultInputSplitAssigner(inputSplits); } *public* *void* open(InputSplit split) *throws* IOException { *this*.current = (PDFFileInputSplit) split; } *public* *boolean* reachedEnd() *throws* IOException { // *TODO* Auto-generated method stub *return* *true*; } *public* StringValue nextRecord(StringValue reuse) *throws* IOException { String content = *new* String(Files.*readAllBytes*(*this*.current .getFile())); *logger*.info("Content " + content); *return* *new* StringValue(content); } *public* *void* close() *throws* IOException { // *TODO* Auto-generated method stub } } --- Thanks, Mohit