On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <[email protected]> wrote:
> What do side inputs look like?
>
A user needs to first pass PCollections for side inputs into the external
transform in addition to ordinary input PCollections and define
PCollectionViews inside the external transform something like:
PCollectionTuple pTuple =
PCollectionTuple.of("main1", main1)
.and("main2", main2)
.and("side", side)
.apply(External.of(...).withMultiOutputs());
public static class TestTransform extends PTransform<PCollectionTuple,
PCollectionTuple> {
@Override
public PCollectionTuple expand(PCollectionTuple input) {
PCollectionView<String> sideView =
input.<String>get("side").apply(View.asSingleton());
PCollection<String> main =
PCollectionList.<String>of(input.get("main1"))
.and(input.get("main2"))
.apply(Flatten.pCollections())
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String x,
OutputReceiver<String> out,
DoFn<String, String>.ProcessContext c) {
out.output(x + c.sideInput(sideView));
}
})
.withSideInputs(sideView));
> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <[email protected]> wrote:
>
>> I wanted to know if anybody has any comment on external transform API for
>> Java SDK.
>>
>> `External.of()` can create external transform for Java SDK. Depending on
>> input and output types, two additional methods are provided:
>> `withMultiOutputs()` which specifies the type of PCollection and
>> `withOutputType()` which specifies the type of output element. Some
>> examples are:
>>
>> PCollection<String> col =
>> testPipeline
>> .apply(Create.of("1", "2", "3"))
>> .apply(External.of(*...*));
>>
>> This is okay without additional methods since 1) input and output types
>> of external transform can be inferred 2) output PCollection is singular.
>>
>
> How does the type/coder at runtime get inferred (doesn't java's type
> erasure get rid of this information)?
>
>
>> PCollectionTuple pTuple =
>> testPipeline
>> .apply(Create.of(1, 2, 3, 4, 5, 6))
>> .apply(
>> External.of(*...*).withMultiOutputs());
>>
>> This requires `withMultiOutputs()` since output PCollection is
>> PCollectionTuple.
>>
>
> Shouldn't this require a mapping from "output" name to coder/type variable
> to be specified as an argument to withMultiOutputs?
>
>
>> PCollection<String> pCol =
>> testPipeline
>> .apply(Create.of("1", "2", "2", "3", "3", "3"))
>> .apply(
>> External.of(...)
>> .<KV<String, Long>>withOutputType())
>> .apply(
>> "toString",
>> MapElements.into(TypeDescriptors.strings()).via(
>> x -> String.format("%s->%s", x.getKey(), x.getValue())));
>>
>> This requires `withOutputType()` since the output element type cannot be
>> inferred from method chaining. I think some users may feel awkward to call
>> method only with the type parameter and empty parenthesis. Without
>> `withOutputType()`, the type of output element will be java.lang.Object
>> which might still be forcefully casted to KV.
>>
>
> How does the output type get preserved in this case (since Java's type
> erasure would remove <KV<String, Long>> after compilation and coder
> inference in my opinion should be broken and or choosing something generic
> like serializable)?
>
The expansion service is responsible for using cross-language compatible
coders in the returning expanded transforms and these are the coders used
in the runtime. Type information annotated by additional methods here is
for compile-time type safety of external transforms.
>
>
Thanks,
>> Heejong
>>
>