[ 
https://issues.apache.org/jira/browse/BEAM-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142200#comment-16142200
 ] 

Xu Mingmin commented on BEAM-2806:
----------------------------------

Sure, this is the code we use in SQL join
{code}
    PipelineOptions options = PipelineOptionsFactory.fromArgs(new 
String[]{}).as(PipelineOptions.class);
    options.setRunner(FlinkRunner.class);
    
    Pipeline p = Pipeline.create(options);
    PCollection<BeamRecord> streamingPCollection = 
sojEventTable.buildIOReader(p);
    PCollection<BeamRecord> lkpPCollection = lkpTable.buildIOReader(p);
    
    final PCollectionView<Map<Integer, Iterable<BeamRecord>>> lkpAsView = 
lkpPCollection
        .apply(WithKeys.<Integer, BeamRecord>of(new 
SerializableFunction<BeamRecord, Integer>() {
          @Override
          public Integer apply(BeamRecord input) {
            return input.getInteger(0);
          }
        }))
        .apply(View.<Integer, BeamRecord>asMultimap());
    
    PCollection<Void> ret = streamingPCollection.apply(
        ParDo.of(new DoFn<BeamRecord, Void>(){
          @ProcessElement public void processElement(ProcessContext context) {
            BeamRecord drvRow = context.element();
            Map<Integer, Iterable<BeamRecord>> key2Rows = 
context.sideInput(lkpAsView);
            int pageId = drvRow.getInteger("PAGE_ID");
            if(key2Rows.get(pageId) != null){
              System.out.println("Record Pass: "+drvRow);
            }
          }
        }).withSideInputs(lkpAsView)
        );

    p.run().waitUntilFinish();
{code}


> support View.CreatePCollectionView in FlinkRunner
> -------------------------------------------------
>
>                 Key: BEAM-2806
>                 URL: https://issues.apache.org/jira/browse/BEAM-2806
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Xu Mingmin
>            Assignee: Aljoscha Krettek
>
> Beam version: 2.2.0-SNAPSHOT
> Here's the code
> {code}
> PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows
>         .apply(View.<BeamRecord, BeamRecord>asMultimap());
> {code}
> And exception when running with {{FlinkRunner}}:
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: The 
> transform View.CreatePCollectionView is currently not supported.
>       at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:586)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:268)
>       at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:202)
>       at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
>       at 
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
>       at 
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:69)
>       at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:104)
>       at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to