[ 
https://issues.apache.org/jira/browse/BEAM-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot updated BEAM-1948:
-----------------------------------
    Description: 
null pointer exception is due to an {{Aggregator}} not being present in 
{{aggregatorSteps}} (maybe because not present in the DAG).
We can reproduce the null pointer exception with a simple pipeline with an 
{{Aggregator}} and a {{State}} like this one:

{code}
    IdentityDoFn identityDoFn = new IdentityDoFn();
    p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), 
KV.of("key", "element3")))
        .apply(ParDo.of(identityDoFn));
    PipelineResult pipelineResult = p.run();
    pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();



  private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
    private final Aggregator<Long, Long> counter = createAggregator("counter", 
Sum.ofLongs());
    private static final String STATE_ID = "state";
    @StateId(STATE_ID)
    private static final StateSpec<Object, ValueState<String>> stateSpec =
        StateSpecs.value(StringUtf8Coder.of());

    @ProcessElement
    public void processElement(ProcessContext context, @StateId(STATE_ID) 
ValueState<String> state){
      state.write("state content");
      counter.addValue(1L);
      context.output(context.element().getValue());
    }

    public Aggregator<Long, Long> getCounter() {
      return counter;
    }
  }
{code}

  was:
Running query3 of nexmark 
(https://github.com/iemejia/beam/blob/BEAM-160-nexmark/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java)
 in streaming mode (UnboundedSource) on Direct runner generates a null pointer 
exception  in {code} DirectRunner.DirectPipelineResult.getAggregatorValues() 
{code}

In
{code} if (steps.contains(transform.getTransform())) {code}
{code} steps == null  {code}

to reproduce it :
run {code} NexmarkDirectDriver.main() {code}
with options
{code}
--query=3 --streaming=true --numEventGenerators=4 --manageResources=false 
--monitorJobs=true --enforceEncodability=false --enforceImmutability=false
{code}
see the repo in link above


> Null pointer exception in 
> DirectRunner.DirectPipelineResult.getAggregatorValues()
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-1948
>                 URL: https://issues.apache.org/jira/browse/BEAM-1948
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Minor
>
> null pointer exception is due to an {{Aggregator}} not being present in 
> {{aggregatorSteps}} (maybe because not present in the DAG).
> We can reproduce the null pointer exception with a simple pipeline with an 
> {{Aggregator}} and a {{State}} like this one:
> {code}
>     IdentityDoFn identityDoFn = new IdentityDoFn();
>     p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), 
> KV.of("key", "element3")))
>         .apply(ParDo.of(identityDoFn));
>     PipelineResult pipelineResult = p.run();
>     pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
>   private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
>     private final Aggregator<Long, Long> counter = 
> createAggregator("counter", Sum.ofLongs());
>     private static final String STATE_ID = "state";
>     @StateId(STATE_ID)
>     private static final StateSpec<Object, ValueState<String>> stateSpec =
>         StateSpecs.value(StringUtf8Coder.of());
>     @ProcessElement
>     public void processElement(ProcessContext context, @StateId(STATE_ID) 
> ValueState<String> state){
>       state.write("state content");
>       counter.addValue(1L);
>       context.output(context.element().getValue());
>     }
>     public Aggregator<Long, Long> getCounter() {
>       return counter;
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to