shanthoosh commented on a change in pull request #940: SAMZA-2121: Add
checkpoint offset field to IncomingMessageEnvelope
URL: https://github.com/apache/samza/pull/940#discussion_r263628931
##########
File path:
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
##########
@@ -86,7 +101,29 @@ public IncomingMessageEnvelope(SystemStreamPartition
systemStreamPartition, Stri
*/
public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition,
String offset,
Object key, Object message, int size, long eventTime, long arrivalTime) {
- this(systemStreamPartition, offset, key, message, size);
+ this(systemStreamPartition, offset, offset, key, message, size, eventTime,
arrivalTime);
+ }
+
+ /**
+ * Constructs a new IncomingMessageEnvelope from specified components
+ * @param systemStreamPartition The aggregate object representing the
incoming stream name, the name of the cluster
+ * from which the stream came, and the partition of the stream from which
the message was received.
+ * @param offset The offset in the partition that the message was received
from.
+ * @param checkpointOffset offset that can be checkpointed when this {@link
IncomingMessageEnvelope} is processed
+ * @param key A deserialized key received from the partition offset.
+ * @param message A deserialized message received from the partition offset.
+ * @param size size of the message and key in bytes.
+ * @param eventTime the timestamp (in epochMillis) of when this event
happened
+ * @param arrivalTime the timestamp (in epochMillis) of when this event
arrived to (i.e., was picked-up by) Samza
+ */
+ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition,
String offset, String checkpointOffset,
+ Object key, Object message, int size, long eventTime, long arrivalTime) {
+ this.systemStreamPartition = systemStreamPartition;
+ this.offset = offset;
+ this.checkpointOffset = checkpointOffset;
Review comment:
1. Can you please share the rationale behind adding checkpoint offset to
`IncomingMessageEnvelope` public API.
2. Can you please throw some light on the other options considered here? Is
it not plausible to do this without making this change in public API.
IMHO, it'll be better not to proliferate the public API with these internal
details which will be hard to remove later on. Anyone who reads this in the
future, would wonder why there are two different offsets in
`IncomingMessageEnvelope` contract. Is it possible to maintain the
mapping(association we want for safe-checkpoint) internally within the samza
framework side and not expose it to samza-users.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services