Hrish created BEAM-4520:
---------------------------

             Summary: No messages delivered after a while with PubsubIO
                 Key: BEAM-4520
                 URL: https://issues.apache.org/jira/browse/BEAM-4520
             Project: Beam
          Issue Type: Bug
          Components: io-java-gcp, runner-flink
    Affects Versions: 2.4.0
            Reporter: Hrish
            Assignee: Chamikara Jayalath


I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn<PubsubMessage, 
KV<String, PubsubMessage>> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage element = c.element();
        KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"), 
element);
        c.output(kv);
    }
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection<PubsubMessage> pubsubColl = p
        .apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute("ts")
            .fromTopic("projects/" + projectName + "/topics/beamtest")
        );


PCollection<KV<String, PubsubMessage>> idfied =
        pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection<KV<String, PubsubMessage>> windowed = idfied
        .apply(Window.<KV<String, 
PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
            .triggering(
                Repeatedly.forever(
                    AfterWatermark.pastEndOfWindow()
                )
            )
            .withAllowedLateness(Duration.standardSeconds(15))
            .discardingFiredPanes());

PCollection<KV<String, Iterable<PubsubMessage>>> grouped = 
windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
{code}
The transforms are not chained for ease of reading. The KVPrinter transform in 
the end is just to print out the messages received from the group by, which 
will be subsequently replaced by actual code once I get this running. When I 
run this, I don't find the trigger executing for quite some time (a couple of 
minutes or longer). When it finally triggers, I see that some of the messages 
are not received (in the final step), not matter how long I keep it running. 
The Pubsub statistics in my GCP/Stackdriver dashboard show that there is a 
backlog of undelivered messages. 

Is this due to the internal watermark that PubsubIO uses? My intention here is 
to make sure that all messages are processed in the groupby, including late 
ones within the allowed lateness window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to