Hi, I want to know is it possible to use PipedInutStream and PipedOutputStream in Flink for reading text data from a file? For example extending a RichSourceFunction for it and readata like this:
DataStream<String> raw = env.addSource(new PipedSource(file_path)); Actually i tried to implement a class for it but as PipedInputStream and PipedOutputStream should be on seperate Threads, I have no idea how to implement that. Here is my template class public static class PipedFile extends RichSourceFunction<String> { PipedOutputStream outputPipe = new PipedOutputStream(); PipedInputStream inputPipe = new PipedInputStream(); FileInputStream fis; PipedFile(String s) throws IOException { outputPipe.connect(inputPipe); fis = new FileInputStream("data_source.csv"); } @Override public void run(SourceContext sourceContext) throws Exception { int length; byte[] buffer = new byte[1024]; while ((length = fis.read(buffer, 0, 1024)) != -1) { outputPipe.write(buffer, 0, length); } } @Override public void cancel() { try { outputPipe.close(); inputPipe.close(); fis.close(); } catch (IOException e) { e.printStackTrace(); } } }