prateekm commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1143790143
##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
private static final String SST_FILE_SUFFIX = ".sst";
private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new
CheckpointV2Serde();
+ // Unlike checkpoint or offset files, side input offset file can have
multiple writers / readers,
+ // since they are written during SideInputTask commit and copied to store
checkpoint directory
+ // by TaskStorageCommitManager during regular TaskInstance commit (these
commits are on separate run loops).
+ // We use a (process-wide) lock to ensure that such write and copy
operations are thread-safe.
Review Comment:
That's right, needed because there's no other coordination/synchronization
between the two run loops. It is used during commit whenever they
read/write/copy side input offsets file.
To avoid deadlocks, lock is only held within a single method invocation, and
the methods do not call each other. On second thought, I'll use a semaphore
instead of a reentrant lock to document and enforce this.
##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
private final Set<SystemStreamPartition> taskSSPs;
private final TaskSideInputHandler taskSideInputHandler;
private final TaskInstanceMetrics metrics;
+ private final long commitMs;
public SideInputTask(
TaskName taskName,
Set<SystemStreamPartition> taskSSPs,
TaskSideInputHandler taskSideInputHandler,
- TaskInstanceMetrics metrics) {
+ TaskInstanceMetrics metrics,
+ long commitMs) {
this.taskName = taskName;
this.taskSSPs = taskSSPs;
this.taskSideInputHandler = taskSideInputHandler;
this.metrics = metrics;
+ this.commitMs = commitMs;
Review Comment:
Will do, thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]