How can I follow up a write with another operation that users another
pcollection from somewhere else in the pipeline?

I came up with this 'Const' transform that returns a fixed PCollection, is
there anything similar to it in the SDK? I couldn't find anything like it.

Here's my (not very well tested) code.

class Const<T> extends PTransform<PBegin, PCollection<T>> {
    private final PCollection<T> sourceFiles;

    public Const(PCollection<T> sourceFiles) {
        this.sourceFiles = sourceFiles;
    }

    @Override
    public PCollection<T> apply(PBegin input) {
        return sourceFiles;
    }
}

// usage

PCollection<KV<String, FileData>> data = inputFileDataByFileName();

PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());

PDone writeOp = data.apply(new TransformFileData())
    .apply((TextIO.Write.named("WriteTransformedData")
        .to("myfile"));

writeOp.getPipeline().apply(new Const(fileNames)).apply(new
DeleteInputFiles());

Some other tests that I ran (I used Create.of() instead of Const) indicated
that if
the writeOp fails the following operation - DeleteInputFiles - will not be
run. Is this true in general?

Thanks,


Frank

Reply via email to