Luke Cwik created BEAM-7567:
-------------------------------

             Summary: StateSpec should contain the state id
                 Key: BEAM-7567
                 URL: https://issues.apache.org/jira/browse/BEAM-7567
             Project: Beam
          Issue Type: Improvement
          Components: sdk-java-core
            Reporter: Luke Cwik
             Fix For: 3.0.0


In the Java SDK we currently ask users to define a state spec as:
{code:java}
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {

  // A state cell holding a single Integer per key+window
  @StateId("index")
  private final StateSpec<ValueState<Integer>> indexSpec =
      StateSpecs.value(VarIntCoder.of());

  @ProcessElement
  public void processElement(
      ProcessContext context,
      @StateId("index") ValueState<Integer> index) {
    int current = firstNonNull(index.read(), 0);
    context.output(KV.of(current, context.element()));
    index.write(current+1);
  }
}
{code}

The suggestion is to move the @StateId into the StateSpec so the could would 
look like:
{code:java}
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {

  // A state cell holding a single Integer per key+window
  private final StateSpec<ValueState<Integer>> indexSpec =
      StateSpecs.value("index", VarIntCoder.of());

  @ProcessElement
  public void processElement(
      ProcessContext context,
      @StateId("index") ValueState<Integer> index) {
    int current = firstNonNull(index.read(), 0);
    context.output(KV.of(current, context.element()));
    index.write(current+1);
  }
}
{code}

We should also remove the coder from the StateSpec hashCode and equals method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to