Hi Mario,

PubSub allows to store record metadata into "attributes". You can write the timestamp into an attribute of your choice and then pass the name of this attribute into the Read transform [1]. This will cause PubSubIO to assign timestamps based on the real timestamp, when the event occurred. Otherwise, PubSubIO uses current timestamp when the event was written to PubSub, which causes the issue you described.

Alternative approach is to somehow "fix" the timestamps after PubSubIO assigned them. This is more involved, it would (ideally) require a stateful DoFn that buffers input events and setups a looping timer to periodically flushing buffered events with the actual timestamp. The timer would have to use `withOutputTimestamp` [2] to specify the timestamp that will be used as output when the timer fires. Obviously, this would have to be the minimal timestamp of all buffered elements. Moreover, this approach might require an estimation of how much distant the actual timestamp of the event is from the time it is written to PubSub, because otherwise you would probably produce many late events. Though this approach is not super complicated, it somehow emulates logic for watermark computation and I would try to avoid it.

The last option is to disable the exception by overriding the getAllowedTimestampSkew() (by returning a huge Duration), but that only fixes the exception, not the problem that this causes in the Pipeline. The result would be elements that can be arbitrarily late after the watermark, thus might be silently dropped under some conditions.

Best,
 Jan

[1] https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String- [2] https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-

On 5/26/23 12:08, Mário Costa via user wrote:
Hi,

I need to process messages/events from google pubsub, the message is sent as JSON payload and contains an json attribute say "time" with the timestamp value of the event.

I need to group the events into 5 minute windows and write them to files, one file per window.

After I extract the timestamp and set it in the pipeline I get an exception message:

java.lang.IllegalArgumentException: Cannot output with timestamp 2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than the timestamp of the current input or timer (2023-05-25T16:40:00.039Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

Is there a way to solve this problem?

How can I override the timestamp of the event without having this issue ?

Follows an example of code of the pipeline:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class PubsubTimestampExample {
    public static void main(String[] args) {
        // Create the pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();

        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Define the Pub/Sub topic and subscription
        String topic = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>";

        // Read the messages from Pub/Sub with a timestamp attribute
        PCollection<String> messages = pipeline
                .apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic(topic)
        );

        // Process the messages and set the timestamp
        PCollection<String> processedMessages = messages
                .apply("SetTimestamp", ParDo.of(new SetTimestampFn()));

        // Print the processed messages
        processedMessages.apply("PrintMessages", ParDo.of(new PrintMessagesFn()));

        // Run the pipeline
        pipeline.run();
    }

    public static class SetTimestampFn extends DoFn<String, String> {
        private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");

        @ProcessElement
        public void processElement(ProcessContext c) {
            String message = c.element();
            String[] parts = message.split(",");  // Assuming message format: "payload,timestamp"
            String payload = parts[0];
            String timestampString = parts[1];

            // Extract and parse the timestamp from the payload
            Instant timestamp = Instant.parse(timestampString, TIMESTAMP_FORMATTER);

            // Set the timestamp for the element
            c.outputWithTimestamp(payload, timestamp);
        }
    }

    public static class PrintMessagesFn extends DoFn<String, Void> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println(c.element());
        }
    }
}
--
Mario Costa
Data Analytics Senior Software Developer

<https://corp.frvr.com> <https://www.facebook.com/frvrgames> <https://pt.linkedin.com/company/frvr>
<https://frvr.com/>

Reply via email to