Github user ilooner commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/322#discussion_r74370558
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
@@ -51,291 +66,607 @@
public class FSWindowDataManager implements WindowDataManager
{
private static final String DEF_RECOVERY_PATH = "idempotentState";
-
- protected transient FSStorageAgent storageAgent;
+ private static final String WAL_FILE_NAME = "wal";
/**
- * Recovery path relative to app path where state is saved.
+ * Recovery filePath relative to app filePath where state is saved.
*/
@NotNull
- private String recoveryPath;
+ private String recoveryPath = DEF_RECOVERY_PATH;
private boolean isRecoveryPathRelativeToAppPath = true;
/**
- * largest window for which there is recovery data across all physical
operator instances.
+ * This is not null only for one physical instance.<br/>
+ * It consists of operator ids which have been deleted but have some
state that can be replayed.
+ * Only one of the instances would be handling (modifying) the files
that belong to this state. <br/>
+ * The value is assigned during partitioning.
*/
- protected transient long largestRecoveryWindow;
+ private Set<Integer> deletedOperators;
+
+ private boolean repartitioned;
/**
- * This is not null only for one physical instance.<br/>
- * It consists of operator ids which have been deleted but have some
state that can be replayed.
- * Only one of the instances would be handling (modifying) the files
that belong to this state.
+ * Used when it is not necessary to replay every streaming/app window.
+ * Used by {@link IncrementalCheckpointManager}
*/
- protected Set<Integer> deletedOperators;
+ private boolean relyOnCheckpoints;
/**
- * Sorted mapping from window id to all the operators that have state to
replay for that window.
+ * largest window for which there is recovery data across all physical
operator instances.
*/
- protected final transient TreeMultimap<Long, Integer> replayState;
+ private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
+
+ private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
- protected transient FileSystem fs;
- protected transient Path appPath;
+ //operator id -> wals (sorted)
+ private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals =
new HashMap<>();
+
+ private transient String statePath;
+ private transient int operatorId;
+
+ private final transient Kryo kryo = new Kryo();
+
+ private transient FileContext fileContext;
public FSWindowDataManager()
{
- replayState = TreeMultimap.create();
- largestRecoveryWindow = Stateless.WINDOW_ID;
- recoveryPath = DEF_RECOVERY_PATH;
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
--- End diff --
for my learning :) why is this needed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---