[jira] [Commented] (BEAM-4520) No messages delivered after a while with PubsubIO

2018-06-08 Thread Hrish (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505853#comment-16505853
 ] 

Hrish commented on BEAM-4520:
-

Looks like setting an option on the Flink Runner makes this work. Thanks to 
[https://stackoverflow.com/questions/44003584/acknowledge-google-pub-sub-message-on-apache-beam#comment75086787_44003584]

However, this is missing in the documentation, so please treat this ticket as 
one to update the docs. The [Flink Runner 
docs|https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner]
 talk about it but there is no mention of how this impacts PubSub acks.

> No messages delivered after a while with PubsubIO
> -
>
> Key: BEAM-4520
> URL: https://issues.apache.org/jira/browse/BEAM-4520
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, runner-flink
>Affects Versions: 2.4.0
>Reporter: Hrish
>Assignee: Chamikara Jayalath
>Priority: Major
>
> I am running the following Beam pipeline code locally, with the FlinkRunner. 
> PubsubIO is used to read messages from a topic. I have a separate thread that 
> publishes messages to the topic at regular intervals (every 30 seconds) and 
> also sets the "ts" attribute which is used later to derive the event time.
> Custom transform to convert to KV pair -
> {code:java}
> private static class PubSubMessageGrouper extends DoFn KV> {
> @ProcessElement
> public void processElement(ProcessContext c) {
> PubsubMessage element = c.element();
> KV kv = KV.of(element.getAttribute("key"), 
> element);
> c.output(kv);
> }
> }
> {code}
> Note that "key" is a key set in the message attributes earlier in the 
> publisher thread. The intent is to group the messages downstream by this key.
> Pipeline code -
> {code:java}
> PCollection pubsubColl = p
> .apply(PubsubIO.readMessagesWithAttributes()
> .withTimestampAttribute("ts")
> .fromTopic("projects/" + projectName + "/topics/beamtest")
> );
> PCollection> idfied =
> pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));
> PCollection> windowed = idfied
> .apply(Window. PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
> .triggering(
> Repeatedly.forever(
> AfterWatermark.pastEndOfWindow()
> )
> )
> .withAllowedLateness(Duration.standardSeconds(15))
> .discardingFiredPanes());
> PCollection>> grouped = 
> windowed.apply(GroupByKey.create());
> grouped.apply(ParDo.of(new KVPrinter()));
> {code}
> The transforms are not chained for ease of reading. The KVPrinter transform 
> in the end is just to print out the messages received from the group by, 
> which will be subsequently replaced by actual code once I get this running. 
> When I run this, I don't find the trigger executing for quite some time (a 
> couple of minutes or longer). When it finally triggers, I see that some of 
> the messages are not received (in the final step), not matter how long I keep 
> it running. The Pubsub statistics in my GCP/Stackdriver dashboard show that 
> there is a backlog of undelivered messages.
> Is this due to the internal watermark that PubsubIO uses? My intention here 
> is to make sure that all messages are processed in the groupby, including 
> late ones within the allowed lateness window.
> Note that if I remove the GroupByKey, and just print the messages after the 
> windowing, I can see all the messages.
> 
> An update:  I switched the .fromTopic to a .fromSubscription, with a 
> pre-created subscription, and things have started working, as in, I can see 
> messages being delivered now. The only difference I can see is that the 
> automatically created subscription from within PubsubIO has an ack deadline 
> of 60 seconds, whereas I created mine with 600 seconds (the max allowed by 
> pubsub).
> However, there is another issue now - messages don't seem to be getting 
> acked. I keep getting redeliveries of old messages, and the Stackdriver 
> metrics show that the num_undelivered_messages (unacked messages count) keeps 
> on increasing. The documentation says that messages will be acked as soon as 
> a GroupByKey or another ParDo happens, and that's happening since I am seeing 
> the windowed groupby-s, but acks don't seem to be happening.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4520) No messages delivered after a while with PubsubIO

2018-06-08 Thread Hrish (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hrish updated BEAM-4520:

Description: 
I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn> {

@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV kv = KV.of(element.getAttribute("key"), 
element);
c.output(kv);
}
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);


PCollection> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection> windowed = idfied
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());

PCollection>> grouped = 
windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
{code}
The transforms are not chained for ease of reading. The KVPrinter transform in 
the end is just to print out the messages received from the group by, which 
will be subsequently replaced by actual code once I get this running. When I 
run this, I don't find the trigger executing for quite some time (a couple of 
minutes or longer). When it finally triggers, I see that some of the messages 
are not received (in the final step), not matter how long I keep it running. 
The Pubsub statistics in my GCP/Stackdriver dashboard show that there is a 
backlog of undelivered messages.

Is this due to the internal watermark that PubsubIO uses? My intention here is 
to make sure that all messages are processed in the groupby, including late 
ones within the allowed lateness window.

Note that if I remove the GroupByKey, and just print the messages after the 
windowing, I can see all the messages.



An update:  I switched the .fromTopic to a .fromSubscription, with a 
pre-created subscription, and things have started working, as in, I can see 
messages being delivered now. The only difference I can see is that the 
automatically created subscription from within PubsubIO has an ack deadline of 
60 seconds, whereas I created mine with 600 seconds (the max allowed by pubsub).

However, there is another issue now - messages don't seem to be getting acked. 
I keep getting redeliveries of old messages, and the Stackdriver metrics show 
that the num_undelivered_messages (unacked messages count) keeps on increasing. 
The documentation says that messages will be acked as soon as a GroupByKey or 
another ParDo happens, and that's happening since I am seeing the windowed 
groupby-s, but acks don't seem to be happening.

 

 

  was:
I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn> {

@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV kv = KV.of(element.getAttribute("key"), 
element);
c.output(kv);
}
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);


PCollection> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection> windowed = idfied
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());

PCollection>> grouped

[jira] [Updated] (BEAM-4520) No messages delivered after a while with PubsubIO

2018-06-07 Thread Hrish (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hrish updated BEAM-4520:

Description: 
I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn> {

@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV kv = KV.of(element.getAttribute("key"), 
element);
c.output(kv);
}
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);


PCollection> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection> windowed = idfied
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());

PCollection>> grouped = 
windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
{code}
The transforms are not chained for ease of reading. The KVPrinter transform in 
the end is just to print out the messages received from the group by, which 
will be subsequently replaced by actual code once I get this running. When I 
run this, I don't find the trigger executing for quite some time (a couple of 
minutes or longer). When it finally triggers, I see that some of the messages 
are not received (in the final step), not matter how long I keep it running. 
The Pubsub statistics in my GCP/Stackdriver dashboard show that there is a 
backlog of undelivered messages.

Is this due to the internal watermark that PubsubIO uses? My intention here is 
to make sure that all messages are processed in the groupby, including late 
ones within the allowed lateness window.

Note that if I remove the GroupByKey, and just print the messages after the 
windowing, I can see all the messages.

  was:
I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn> {

@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV kv = KV.of(element.getAttribute("key"), 
element);
c.output(kv);
}
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);


PCollection> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection> windowed = idfied
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());

PCollection>> grouped = 
windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
{code}
The transforms are not chained for ease of reading. The KVPrinter transform in 
the end is just to print out the messages received from the group by, which 
will be subsequently replaced by actual code once I get this running. When I 
run this, I don't find the trigger executing for quite some time (a couple of 
minutes or longer). When it finally triggers, I see that some of the messages 
are not received (in the final step), not matter how long I keep it running. 
The Pubsub statistics in my GCP/Stackdriver dashboard show that there is a 
backlog of undelivered messages. 

Is this due to the internal watermark that PubsubIO uses? My intention here is 
to make sure that all messages are processed in the groupby, including late 
ones within 

[jira] [Created] (BEAM-4520) No messages delivered after a while with PubsubIO

2018-06-07 Thread Hrish (JIRA)
Hrish created BEAM-4520:
---

 Summary: No messages delivered after a while with PubsubIO
 Key: BEAM-4520
 URL: https://issues.apache.org/jira/browse/BEAM-4520
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp, runner-flink
Affects Versions: 2.4.0
Reporter: Hrish
Assignee: Chamikara Jayalath


I am running the following Beam pipeline code locally, with the FlinkRunner. 
PubsubIO is used to read messages from a topic. I have a separate thread that 
publishes messages to the topic at regular intervals (every 30 seconds) and 
also sets the "ts" attribute which is used later to derive the event time.

Custom transform to convert to KV pair -
{code:java}
private static class PubSubMessageGrouper extends DoFn> {

@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV kv = KV.of(element.getAttribute("key"), 
element);
c.output(kv);
}
}
{code}
Note that "key" is a key set in the message attributes earlier in the publisher 
thread. The intent is to group the messages downstream by this key.

Pipeline code -
{code:java}
PCollection pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);


PCollection> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));

PCollection> windowed = idfied
.apply(Window.>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());

PCollection>> grouped = 
windowed.apply(GroupByKey.create());

grouped.apply(ParDo.of(new KVPrinter()));
{code}
The transforms are not chained for ease of reading. The KVPrinter transform in 
the end is just to print out the messages received from the group by, which 
will be subsequently replaced by actual code once I get this running. When I 
run this, I don't find the trigger executing for quite some time (a couple of 
minutes or longer). When it finally triggers, I see that some of the messages 
are not received (in the final step), not matter how long I keep it running. 
The Pubsub statistics in my GCP/Stackdriver dashboard show that there is a 
backlog of undelivered messages. 

Is this due to the internal watermark that PubsubIO uses? My intention here is 
to make sure that all messages are processed in the groupby, including late 
ones within the allowed lateness window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)