mynameborat commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1142837174
##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
private final Set<SystemStreamPartition> taskSSPs;
private final TaskSideInputHandler taskSideInputHandler;
private final TaskInstanceMetrics metrics;
+ private final long commitMs;
public SideInputTask(
TaskName taskName,
Set<SystemStreamPartition> taskSSPs,
TaskSideInputHandler taskSideInputHandler,
- TaskInstanceMetrics metrics) {
+ TaskInstanceMetrics metrics,
+ long commitMs) {
this.taskName = taskName;
this.taskSSPs = taskSSPs;
this.taskSideInputHandler = taskSideInputHandler;
this.metrics = metrics;
+ this.commitMs = commitMs;
Review Comment:
Reminder to add warn logging about `commitMs <= 0`
##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
private static final String SST_FILE_SUFFIX = ".sst";
private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new
CheckpointV2Serde();
+ // Unlike checkpoint or offset files, side input offset file can have
multiple writers / readers,
+ // since they are written during SideInputTask commit and copied to store
checkpoint directory
+ // by TaskStorageCommitManager during regular TaskInstance commit (these
commits are on separate run loops).
+ // We use a (process-wide) lock to ensure that such write and copy
operations are thread-safe.
Review Comment:
Why do we need process-wide locks? We don't have container level stores and
this util is scoped to task level right?
##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
private static final String SST_FILE_SUFFIX = ".sst";
private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new
CheckpointV2Serde();
+ // Unlike checkpoint or offset files, side input offset file can have
multiple writers / readers,
+ // since they are written during SideInputTask commit and copied to store
checkpoint directory
+ // by TaskStorageCommitManager during regular TaskInstance commit (these
commits are on separate run loops).
+ // We use a (process-wide) lock to ensure that such write and copy
operations are thread-safe.
Review Comment:
Never mind. I realized this lock is used across the two run loops during the
upload to the blob store code path. Is that right?
##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine>
createStoreEngines(Set<String> storeNames, Jo
// Put non persisted stores
nonPersistedStores.forEach(storageEngines::put);
// Create persisted stores
- storeNames.forEach(storeName -> {
+ storeNames.stream().filter(s ->
!nonPersistedStores.containsKey(s)).forEach(storeName -> {
Review Comment:
Is this part of bug fix or more like enforcing the flow for only persisted
stores?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]