Hi,
I need to process messages/events from google pubsub, the message is sent
as JSON payload and contains an json attribute say "time" with the
timestamp value of the event.
I need to group the events into 5 minute windows and write them to files,
one file per window.
After I extract the timestamp and set it in the pipeline I get an exception
message:
java.lang.IllegalArgumentException: Cannot output with timestamp
2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than the
timestamp of the current input or timer (2023-05-25T16:40:00.039Z) minus
the allowed skew (0 milliseconds) and no later than
294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc
for details on changing the allowed skew.
Is there a way to solve this problem?
How can I override the timestamp of the event without having this issue ?
Follows an example of code of the pipeline:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class PubsubTimestampExample {
public static void main(String[] args) {
// Create the pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Define the Pub/Sub topic and subscription
String topic = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>";
// Read the messages from Pub/Sub with a timestamp attribute
PCollection<String> messages = pipeline
.apply("ReadFromPubsub",
PubsubIO.readStrings().fromTopic(topic)
);
// Process the messages and set the timestamp
PCollection<String> processedMessages = messages
.apply("SetTimestamp", ParDo.of(new SetTimestampFn()));
// Print the processed messages
processedMessages.apply("PrintMessages", ParDo.of(new
PrintMessagesFn()));
// Run the pipeline
pipeline.run();
}
public static class SetTimestampFn extends DoFn<String, String> {
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
@ProcessElement
public void processElement(ProcessContext c) {
String message = c.element();
String[] parts = message.split(","); // Assuming message
format: "payload,timestamp"
String payload = parts[0];
String timestampString = parts[1];
// Extract and parse the timestamp from the payload
Instant timestamp = Instant.parse(timestampString,
TIMESTAMP_FORMATTER);
// Set the timestamp for the element
c.outputWithTimestamp(payload, timestamp);
}
}
public static class PrintMessagesFn extends DoFn<String, Void> {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}
}
--
Mario Costa
Data Analytics Senior Software Developer
<https://corp.frvr.com> <https://www.facebook.com/frvrgames>
<https://pt.linkedin.com/company/frvr>
<https://frvr.com/>