Hi,

I want to know is it possible to use PipedInutStream and PipedOutputStream
in Flink for reading text data from a file?
For example extending a RichSourceFunction for it and readata like this:

DataStream<String> raw = env.addSource(new PipedSource(file_path));

Actually i tried to implement a class for it but as PipedInputStream and
PipedOutputStream should be on seperate Threads, I have no idea how to
implement that.

Here is my template class

public static class PipedFile extends RichSourceFunction<String> {
    PipedOutputStream outputPipe = new PipedOutputStream();
    PipedInputStream inputPipe = new PipedInputStream();
    FileInputStream fis;


    PipedFile(String s) throws IOException {
        outputPipe.connect(inputPipe);
        fis = new FileInputStream("data_source.csv");
    }
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        int length;
        byte[] buffer = new byte[1024];
        while ((length = fis.read(buffer, 0, 1024)) != -1) {
            outputPipe.write(buffer, 0, length);
        }
    }

    @Override
    public void cancel() {
        try {
            outputPipe.close();
            inputPipe.close();
            fis.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Reply via email to