Note additionally that by using side inputs, you can introduce an ordering
dependency by having an earlier PTransform write to a side output which is
consumed as a side input by the "after" case. For example, in your code you
might write something like

PCollection<String> transformedData = data.apply(new TransformFileData());

transformedData.apply(TextIO.Write...);
fileNames.apply(new DeleteInputFiles(transformedData));

class DeleteInputFiles extends PTransform<PCollection<String>,
PCollection<String>> {
  PCollection<String> happensBefore;

  public PCollection<String> apply(PCollection<String> input) {
    return
input.apply(ParDo.of(deleteFilesFn).withSideInputs(signal.apply(Count.globally().asSingletonView()));
  }
}

Because the execution of a ParDo that reads a Side Input is delayed until
the side input is ready, this will delay the execution of DeleteInputFiles
until the transformedData PCollection has been produced.

On Wed, Jun 15, 2016 at 8:59 AM, Thomas Groh <[email protected]> wrote:

> The Beam sink API doesn't currently support "after this completes, then
> execute this transform". We would like to provide support for this type of
> Pipeline in the future, however.
>
> With regards to the reuse of elements, you don't need the Const transform;
> a PCollection is an immutable collection of elements, and PTransforms do
> not mutate the contents but produce a new PCollection of the transformed
> elements, and thus you can reuse the PCollection you are passing to const
> directly. your code above could be written as
>
> PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());
>
> PDone writeOp = data.apply(new TransformFileData())
>     .apply((TextIO.Write.named("WriteTransformedData")
>         .to("myfile"));
>
> fileNames.apply(new DeleteInputFiles());
>
> However, due to the fact that you can't currently enforce the order of the
> write elements and the delete input files, this may delete files whenever
> the runner chooses to execute the transform, which could be at any point
> before or after those files would be otherwise read.
>
> On Wed, Jun 15, 2016 at 4:33 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>> Hi Frank,
>>
>> If you want to write a static PCollection, you can use PTransform (even
>> better is to create kind of ConstIO).
>>
>> On the other hand, you can change/override the PCollection using:
>>
>> .apply(ParDo.of(new DoFn() {
>>   public void processElement(ProcessContext c) {
>>     c.output(...);
>>   }
>> });
>>
>> Regards
>> JB
>>
>>
>> On 06/15/2016 01:26 PM, Frank Wilson wrote:
>>
>>> How can I follow up a write with another operation that users another
>>> pcollection from somewhere else in the pipeline?
>>>
>>> I came up with this 'Const' transform that returns a fixed PCollection,
>>> is there anything similar to it in the SDK? I couldn't find anything
>>> like it.
>>>
>>> Here's my (not very well tested) code.
>>>
>>> class Const<T> extends PTransform<PBegin, PCollection<T>> {
>>>      private final PCollection<T> sourceFiles;
>>>
>>>      public Const(PCollection<T> sourceFiles) {
>>>          this.sourceFiles = sourceFiles;
>>>      }
>>>
>>>      @Override
>>>      public PCollection<T> apply(PBegin input) {
>>>          return sourceFiles;
>>>      }
>>> }
>>>
>>> // usage
>>>
>>> PCollection<KV<String, FileData>> data = inputFileDataByFileName();
>>>
>>> PCollection<String> fileNames = data.apply("GetFileNames",
>>> Keys.create());
>>>
>>> PDone writeOp = data.apply(new TransformFileData())
>>>      .apply((TextIO.Write.named("WriteTransformedData")
>>>          .to("myfile"));
>>>
>>> writeOp.getPipeline().apply(new Const(fileNames)).apply(new
>>> DeleteInputFiles());
>>>
>>> Some other tests that I ran (I used Create.of() instead of Const)
>>> indicated that if
>>> the writeOp fails the following operation - DeleteInputFiles - will not
>>> be run. Is this true in general?
>>>
>>> Thanks,
>>>
>>>
>>> Frank
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>

Reply via email to