I’m trying to stream data to a file on an S3 compatible system (MINIO):

DataStream<Row> resultStream = tEnv.toAppendStream(log_counts,
Types.ROW(Types.STRING, Types.STRING, Types.LONG));

final StreamingFileSink<Row> sink =
        StreamingFileSink.forRowFormat(
                new Path("s3://argo-artifacts/"), new
SimpleStringEncoder<Row>("UTF-8"))
                .withBucketAssigner(new KeyBucketAssigner())
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();

resultStream.addSink(sink);

... No file is generated. However this method successfully writes a file to
the bucket:

resultStream.writeAsText("s3://argo-artifacts/output.txt");

Here are my config settings:

    state.backend: filesystem
    state.checkpoints.dir: s3://flink/checkpoints
    state.savepoints.dir: s3://flink/savepoints
    s3.endpoint: http://10.43.42.255:9000
    # s3.endpoint: http://10.43.70.109:9000
    s3.path-style-access: true
    s3.path.style.access: true
    s3.access-key: qCEwcLzhi7xfhl5R6sXLn93a5brgRtBs
    s3.secret-key: fJWZFviCxWUrfjQoXZ4UAjN4YviQXQOz

-- 
Robert Cullen
240-475-4490

Reply via email to