Hi everyone,

I'm trying to write a simple pipeline to experiment both stateful
processing and session window.

I have an event stream, each event has a timestamp and a session key, I
want to group by each session and enrich all events using a common state of
the session. In this case I'm just replacing the event with an incremental
counter.

So, let's say I have a source that outputs an event every second and my
stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
the session key as the value is useless for the purpose of the issue I'm
experiencing)

I want the following output: [<a, 0>, <a, 1>, <b, 0>, <a, 2>, <a, 3>, ...]
(actually the order is not important)

Unluckily my code seems not to work as I was expecting and I'm not able to
understand the reason. (to be honest I haven't found many resources on the
topic) What I actually get is something like:

a, 0
a, 1
b, 0
a, 0    <-- ???
a, 2,   <---???
c, 0,
...

that makes me wonder if I have actually understood how the state is related
to the key-window pair or maybe if I have just misunderstood how the
window/triggering works.

My pipeline looks something like:

p.apply(TextIO.read().from("input.json"))

 .apply(MapElements.via(new ParseTableRowJson()))

 .apply(new AugmentEvents())

 .apply(ParDo.of(new DoFn<KV<String, Long>, Void>() {

          @ProcessElement

  public void processElement(ProcessContext c)  {

    LOG.info(c.element().getKey() + ": " + c.element().getValue());

  }

}));

...

static class AugmentEvents extends PTransform<PCollection<TableRow>,
PCollection<KV<String, Long>>> {

  @Override

  public PCollection<KV<String, Long>> expand(PCollection<TableRow> input) {

    return input

      .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))

      .apply(new ComputeSessions());

  }

}


static class ComputeSessions extends PTransform<PCollection<KV<String,
TableRow>>, PCollection<KV<String, Long>>> {

  @Override

  public PCollection<KV<String, Long>> expand(PCollection<KV<String,
TableRow>> events) {

    return events

      .apply(Window.<KV<String, TableRow>>into(Sessions.
withGapDuration(Duration.standardMinutes(10)))

      .triggering(AfterPane.elementCountAtLeast(1))

      .discardingFiredPanes()

      .withAllowedLateness(Duration.standardMinutes(10)))

      .apply(ParDo.of(new StatefulCount()));

  }

}

static class StatefulCount extends DoFn<KV<String, TableRow>, KV<String,
Long>> {

  @StateId("storage")

  private final StateSpec<ValueState<Integer>> storageSpec =
 StateSpecs.value(VarIntCoder.of());

  @ProcessElement

  public void processElement(ProcessContext context, BoundedWindow window,
@StateId("storage") ValueState<Integer> storage) {

    Integer val = storage.read();

    if (val == null) {

      val = new Integer(0);

    }

    int current = val.intValue();

    context.output(KV.of(context.element().getKey(), new Long(current)));

    storage.write(current+1);

  }

}

Maurizio

Reply via email to