Hi Mario,
PubSub allows to store record metadata into "attributes". You can write
the timestamp into an attribute of your choice and then pass the name of
this attribute into the Read transform [1]. This will cause PubSubIO to
assign timestamps based on the real timestamp, when the event occurred.
Otherwise, PubSubIO uses current timestamp when the event was written to
PubSub, which causes the issue you described.
Alternative approach is to somehow "fix" the timestamps after PubSubIO
assigned them. This is more involved, it would (ideally) require a
stateful DoFn that buffers input events and setups a looping timer to
periodically flushing buffered events with the actual timestamp. The
timer would have to use `withOutputTimestamp` [2] to specify the
timestamp that will be used as output when the timer fires. Obviously,
this would have to be the minimal timestamp of all buffered elements.
Moreover, this approach might require an estimation of how much distant
the actual timestamp of the event is from the time it is written to
PubSub, because otherwise you would probably produce many late events.
Though this approach is not super complicated, it somehow emulates logic
for watermark computation and I would try to avoid it.
The last option is to disable the exception by overriding the
getAllowedTimestampSkew() (by returning a huge Duration), but that only
fixes the exception, not the problem that this causes in the Pipeline.
The result would be elements that can be arbitrarily late after the
watermark, thus might be silently dropped under some conditions.
Best,
Jan
[1]
https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-
[2]
https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-
On 5/26/23 12:08, Mário Costa via user wrote:
Hi,
I need to process messages/events from google pubsub, the message is
sent as JSON payload and contains an json attribute say "time" with
the timestamp value of the event.
I need to group the events into 5 minute windows and write them to
files, one file per window.
After I extract the timestamp and set it in the pipeline I get an
exception message:
java.lang.IllegalArgumentException: Cannot output with timestamp
2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than
the timestamp of the current input or timer (2023-05-25T16:40:00.039Z)
minus the allowed skew (0 milliseconds) and no later than
294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew.
Is there a way to solve this problem?
How can I override the timestamp of the event without having this issue ?
Follows an example of code of the pipeline:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class PubsubTimestampExample {
public static void main(String[] args) {
// Create the pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Define the Pub/Sub topic and subscription
String topic = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>";
// Read the messages from Pub/Sub with a timestamp attribute
PCollection<String> messages = pipeline
.apply("ReadFromPubsub",
PubsubIO.readStrings().fromTopic(topic)
);
// Process the messages and set the timestamp
PCollection<String> processedMessages = messages
.apply("SetTimestamp", ParDo.of(new SetTimestampFn()));
// Print the processed messages
processedMessages.apply("PrintMessages", ParDo.of(new
PrintMessagesFn()));
// Run the pipeline
pipeline.run();
}
public static class SetTimestampFn extends DoFn<String, String> {
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
@ProcessElement
public void processElement(ProcessContext c) {
String message = c.element();
String[] parts = message.split(","); // Assuming message
format: "payload,timestamp"
String payload = parts[0];
String timestampString = parts[1];
// Extract and parse the timestamp from the payload
Instant timestamp = Instant.parse(timestampString,
TIMESTAMP_FORMATTER);
// Set the timestamp for the element
c.outputWithTimestamp(payload, timestamp);
}
}
public static class PrintMessagesFn extends DoFn<String, Void> {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}
}
--
Mario Costa
Data Analytics Senior Software Developer
<https://corp.frvr.com> <https://www.facebook.com/frvrgames>
<https://pt.linkedin.com/company/frvr>
<https://frvr.com/>