[
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416592#comment-15416592
]
ASF GitHub Bot commented on APEXMALHAR-2063:
--------------------------------------------
Github user ilooner commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/322#discussion_r74370558
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
@@ -51,291 +66,607 @@
public class FSWindowDataManager implements WindowDataManager
{
private static final String DEF_RECOVERY_PATH = "idempotentState";
-
- protected transient FSStorageAgent storageAgent;
+ private static final String WAL_FILE_NAME = "wal";
/**
- * Recovery path relative to app path where state is saved.
+ * Recovery filePath relative to app filePath where state is saved.
*/
@NotNull
- private String recoveryPath;
+ private String recoveryPath = DEF_RECOVERY_PATH;
private boolean isRecoveryPathRelativeToAppPath = true;
/**
- * largest window for which there is recovery data across all physical
operator instances.
+ * This is not null only for one physical instance.<br/>
+ * It consists of operator ids which have been deleted but have some
state that can be replayed.
+ * Only one of the instances would be handling (modifying) the files
that belong to this state. <br/>
+ * The value is assigned during partitioning.
*/
- protected transient long largestRecoveryWindow;
+ private Set<Integer> deletedOperators;
+
+ private boolean repartitioned;
/**
- * This is not null only for one physical instance.<br/>
- * It consists of operator ids which have been deleted but have some
state that can be replayed.
- * Only one of the instances would be handling (modifying) the files
that belong to this state.
+ * Used when it is not necessary to replay every streaming/app window.
+ * Used by {@link IncrementalCheckpointManager}
*/
- protected Set<Integer> deletedOperators;
+ private boolean relyOnCheckpoints;
/**
- * Sorted mapping from window id to all the operators that have state to
replay for that window.
+ * largest window for which there is recovery data across all physical
operator instances.
*/
- protected final transient TreeMultimap<Long, Integer> replayState;
+ private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
+
+ private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
- protected transient FileSystem fs;
- protected transient Path appPath;
+ //operator id -> wals (sorted)
+ private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals =
new HashMap<>();
+
+ private transient String statePath;
+ private transient int operatorId;
+
+ private final transient Kryo kryo = new Kryo();
+
+ private transient FileContext fileContext;
public FSWindowDataManager()
{
- replayState = TreeMultimap.create();
- largestRecoveryWindow = Stateless.WINDOW_ID;
- recoveryPath = DEF_RECOVERY_PATH;
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
--- End diff --
for my learning :) why is this needed?
> Integrate WAL to FS WindowDataManager
> -------------------------------------
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Chandni Singh
> Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying
> tuples every completed application window after failure. For this it saves
> meta-data in a file per window. Having multiple small size files on hdfs
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain
> a mapping of how much data was flushed to WAL each window.
> In order to use FileSystemWAL for replaying data of a finished window, there
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This
> window may not be checkpointed.
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery
> so this poses a problem.
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2. FileSystemWAL writes to temporary files. The mapping of temp files to
> actual file is part of its state which is checkpointed. Since
> WindowDataManager replays data of a window not yet checkpointed, it needs to
> know the actual temporary file the data is being persisted to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)