[
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416630#comment-15416630
]
ASF GitHub Bot commented on APEXMALHAR-2063:
--------------------------------------------
Github user ilooner commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/322#discussion_r74372710
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
@@ -51,291 +66,607 @@
public class FSWindowDataManager implements WindowDataManager
{
private static final String DEF_RECOVERY_PATH = "idempotentState";
-
- protected transient FSStorageAgent storageAgent;
+ private static final String WAL_FILE_NAME = "wal";
/**
- * Recovery path relative to app path where state is saved.
+ * Recovery filePath relative to app filePath where state is saved.
*/
@NotNull
- private String recoveryPath;
+ private String recoveryPath = DEF_RECOVERY_PATH;
private boolean isRecoveryPathRelativeToAppPath = true;
/**
- * largest window for which there is recovery data across all physical
operator instances.
+ * This is not null only for one physical instance.<br/>
+ * It consists of operator ids which have been deleted but have some
state that can be replayed.
+ * Only one of the instances would be handling (modifying) the files
that belong to this state. <br/>
+ * The value is assigned during partitioning.
*/
- protected transient long largestRecoveryWindow;
+ private Set<Integer> deletedOperators;
+
+ private boolean repartitioned;
/**
- * This is not null only for one physical instance.<br/>
- * It consists of operator ids which have been deleted but have some
state that can be replayed.
- * Only one of the instances would be handling (modifying) the files
that belong to this state.
+ * Used when it is not necessary to replay every streaming/app window.
+ * Used by {@link IncrementalCheckpointManager}
*/
- protected Set<Integer> deletedOperators;
+ private boolean relyOnCheckpoints;
/**
- * Sorted mapping from window id to all the operators that have state to
replay for that window.
+ * largest window for which there is recovery data across all physical
operator instances.
*/
- protected final transient TreeMultimap<Long, Integer> replayState;
+ private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
+
+ private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
- protected transient FileSystem fs;
- protected transient Path appPath;
+ //operator id -> wals (sorted)
+ private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals =
new HashMap<>();
+
+ private transient String statePath;
+ private transient int operatorId;
+
+ private final transient Kryo kryo = new Kryo();
+
+ private transient FileContext fileContext;
public FSWindowDataManager()
{
- replayState = TreeMultimap.create();
- largestRecoveryWindow = Stateless.WINDOW_ID;
- recoveryPath = DEF_RECOVERY_PATH;
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
@Override
public void setup(Context.OperatorContext context)
{
- Configuration configuration = new Configuration();
+ operatorId = context.getId();
+
if (isRecoveryPathRelativeToAppPath) {
- appPath = new Path(context.getValue(DAG.APPLICATION_PATH) +
Path.SEPARATOR + recoveryPath);
+ statePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR
+ recoveryPath;
} else {
- appPath = new Path(recoveryPath);
+ statePath = recoveryPath;
}
try {
- storageAgent = new FSStorageAgent(appPath.toString(), configuration);
+ fileContext = FileContextUtils.getFileContext(statePath);
+
setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
- fs = FileSystem.newInstance(appPath.toUri(), configuration);
+ private void setupWals(long activationWindow) throws IOException
+ {
+ findFiles(wal, operatorId);
+ configureWal(wal, operatorId, !relyOnCheckpoints);
+
+ if (repartitioned) {
+ createReadOnlyWals();
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry :
readOnlyWals.entrySet()) {
+ findFiles(entry.getValue(), entry.getKey());
+ configureWal(entry.getValue(), entry.getKey(), true);
+ }
+ }
- if (fs.exists(appPath)) {
- FileStatus[] fileStatuses = fs.listStatus(appPath);
+ //find largest recovery window
+ if (!relyOnCheckpoints) {
+ long recoveryWindow = findLargestRecoveryWindow(wal, null);
+ //committed will not delete temp files so it is possible that when
reading from files, a smaller window
+ //than the activation window is found.
+ if (recoveryWindow > activationWindow) {
+ largestRecoveryWindow = recoveryWindow;
+ }
+ if (wal.getReader().getCurrentPointer() != null) {
+
wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
+ }
+ } else {
+ wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
+ largestRecoveryWindow = wal.getLastCheckpointedWindow();
+ }
- for (FileStatus operatorDirStatus : fileStatuses) {
- int operatorId =
Integer.parseInt(operatorDirStatus.getPath().getName());
+ if (repartitioned && largestRecoveryWindow > Stateless.WINDOW_ID) {
+ //find the min of max window ids: a downstream will not finish a
window until all the upstream have finished it.
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry :
readOnlyWals.entrySet()) {
- for (FileStatus status :
fs.listStatus(operatorDirStatus.getPath())) {
- String fileName = status.getPath().getName();
- if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
- continue;
- }
- long windowId = Long.parseLong(fileName, 16);
- replayState.put(windowId, operatorId);
- if (windowId > largestRecoveryWindow) {
- largestRecoveryWindow = windowId;
- }
+ long recoveryWindow = Stateless.WINDOW_ID;
+ if (!relyOnCheckpoints) {
+ long window = findLargestRecoveryWindow(entry.getValue(), null);
+ if (window > activationWindow) {
+ recoveryWindow = window;
}
+ } else {
+ recoveryWindow = findLargestRecoveryWindow(entry.getValue(),
activationWindow);
+ }
+
+ if (recoveryWindow < largestRecoveryWindow) {
+ largestRecoveryWindow = recoveryWindow;
}
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+
+ //reset readers
+ wal.getReader().seek(wal.walStartPointer);
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.getReader().seek(wal.walStartPointer);
+ }
+
+ wal.setup();
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.setup();
+ }
+
}
- @Override
- public void save(Object object, int operatorId, long windowId) throws
IOException
+ protected void createReadOnlyWals() throws IOException
{
- storageAgent.save(object, operatorId, windowId);
+ RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new
Path(statePath));
+ while (operatorsIter.hasNext()) {
+ FileStatus status = operatorsIter.next();
+ int operatorId = Integer.parseInt(status.getPath().getName());
+
+ if (operatorId != this.operatorId) {
+ //create read-only wal for other partitions
+ FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
+ readOnlyWals.put(operatorId, wal);
+ }
+ }
}
- @Override
- public Object load(int operatorId, long windowId) throws IOException
+ private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean
updateWalState) throws IOException
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null || !operators.contains(operatorId)) {
- return null;
+ String operatorDir = statePath + Path.SEPARATOR + operatorId;
+ wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
+ wal.fileContext = fileContext;
+
+ if (updateWalState) {
+ if (!wal.fileDescriptors.isEmpty()) {
+ SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
+
+ wal.walStartPointer = new
FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0);
+
+ FSWindowReplayWAL.FileDescriptor last =
wal.fileDescriptors.get(sortedParts.last()).last();
+ if (last.isTmp) {
+ wal.tempPartFiles.put(last.part, last.filePath.toString());
+ }
+ }
}
- return storageAgent.load(operatorId, windowId);
}
- @Override
- public void delete(int operatorId, long windowId) throws IOException
+ private void findFiles(FSWindowReplayWAL wal, int operatorId) throws
IOException
{
- storageAgent.delete(operatorId, windowId);
+ String operatorDir = statePath + Path.SEPARATOR + operatorId;
+ Path operatorPath = new Path(operatorDir);
+ if (fileContext.util().exists(operatorPath)) {
+ RemoteIterator<FileStatus> walFilesIter =
fileContext.listStatus(operatorPath);
+
+ while (walFilesIter.hasNext()) {
+ FileStatus fileStatus = walFilesIter.next();
+ FSWindowReplayWAL.FileDescriptor descriptor =
FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
+ wal.fileDescriptors.put(descriptor.part, descriptor);
+ }
+ }
}
- @Override
- public Map<Integer, Object> load(long windowId) throws IOException
+ private long findLargestRecoveryWindow(FSWindowReplayWAL wal, Long
ceilingWindow) throws IOException
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null) {
- return null;
- }
- Map<Integer, Object> data = Maps.newHashMap();
- for (int operatorId : operators) {
- data.put(operatorId, load(operatorId, windowId));
+ if (!wal.fileDescriptors.isEmpty()) {
+ FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+ //to find the largest window, we only need to look at the last file.
+ NavigableSet<Integer> descendingParts = new
TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
+ for (int part : descendingParts) {
+ FSWindowReplayWAL.FileDescriptor last =
wal.fileDescriptors.get(part).last();
+ reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
+
+ long endOffset = -1;
+
+ long lastWindow = Stateless.WINDOW_ID;
+ Slice slice = readNext(reader);
+
+ while (slice != null) {
+ boolean skipComplete = skipNext(reader);
+ if (!skipComplete) {
+ //artifact not saved so this window was not finished.
+ break;
+ }
+ Slice windowSlice = slice;
+ long offset = reader.getCurrentPointer().getOffset();
+ slice = readNext(reader); //either null, deleted or next window
+
+ if (slice == null || !slice.equals(DELETED)) {
+ //delete entry not found so window was not deleted
+ long window = Longs.fromByteArray(windowSlice.toByteArray());
+
+ if (ceilingWindow != null && window > ceilingWindow) {
+ break;
+ }
+ endOffset = offset;
+ lastWindow = window;
--- End diff --
I'm not sure if I'm understanding how delete works correctly. But should
the lastWindow be rolled back to the previous window when we encounter DELETED?
> Integrate WAL to FS WindowDataManager
> -------------------------------------
>
> Key: APEXMALHAR-2063
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Chandni Singh
> Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying
> tuples every completed application window after failure. For this it saves
> meta-data in a file per window. Having multiple small size files on hdfs
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain
> a mapping of how much data was flushed to WAL each window.
> In order to use FileSystemWAL for replaying data of a finished window, there
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This
> window may not be checkpointed.
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery
> so this poses a problem.
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2. FileSystemWAL writes to temporary files. The mapping of temp files to
> actual file is part of its state which is checkpointed. Since
> WindowDataManager replays data of a window not yet checkpointed, it needs to
> know the actual temporary file the data is being persisted to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)