[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416592#comment-15416592
 ] 

ASF GitHub Bot commented on APEXMALHAR-2063:
--------------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/322#discussion_r74370558
  
    --- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java 
---
    @@ -51,291 +66,607 @@
     public class FSWindowDataManager implements WindowDataManager
     {
       private static final String DEF_RECOVERY_PATH = "idempotentState";
    -
    -  protected transient FSStorageAgent storageAgent;
    +  private static final String WAL_FILE_NAME = "wal";
     
       /**
    -   * Recovery path relative to app path where state is saved.
    +   * Recovery filePath relative to app filePath where state is saved.
        */
       @NotNull
    -  private String recoveryPath;
    +  private String recoveryPath = DEF_RECOVERY_PATH;
     
       private boolean isRecoveryPathRelativeToAppPath = true;
     
       /**
    -   * largest window for which there is recovery data across all physical 
operator instances.
    +   * This is not null only for one physical instance.<br/>
    +   * It consists of operator ids which have been deleted but have some 
state that can be replayed.
    +   * Only one of the instances would be handling (modifying) the files 
that belong to this state. <br/>
    +   * The value is assigned during partitioning.
        */
    -  protected transient long largestRecoveryWindow;
    +  private Set<Integer> deletedOperators;
    +
    +  private boolean repartitioned;
     
       /**
    -   * This is not null only for one physical instance.<br/>
    -   * It consists of operator ids which have been deleted but have some 
state that can be replayed.
    -   * Only one of the instances would be handling (modifying) the files 
that belong to this state.
    +   * Used when it is not necessary to replay every streaming/app window.
    +   * Used by {@link IncrementalCheckpointManager}
        */
    -  protected Set<Integer> deletedOperators;
    +  private boolean relyOnCheckpoints;
     
       /**
    -   * Sorted mapping from window id to all the operators that have state to 
replay for that window.
    +   * largest window for which there is recovery data across all physical 
operator instances.
        */
    -  protected final transient TreeMultimap<Long, Integer> replayState;
    +  private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
    +
    +  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
     
    -  protected transient FileSystem fs;
    -  protected transient Path appPath;
    +  //operator id -> wals (sorted)
    +  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = 
new HashMap<>();
    +
    +  private transient String statePath;
    +  private transient int operatorId;
    +
    +  private final transient Kryo kryo = new Kryo();
    +
    +  private transient FileContext fileContext;
     
       public FSWindowDataManager()
       {
    -    replayState = TreeMultimap.create();
    -    largestRecoveryWindow = Stateless.WINDOW_ID;
    -    recoveryPath = DEF_RECOVERY_PATH;
    +    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
    --- End diff --
    
    for my learning :) why is this needed?


> Integrate WAL to FS WindowDataManager
> -------------------------------------
>
>                 Key: APEXMALHAR-2063
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>
> FS Window Data Manager is used to save meta-data that helps in replaying 
> tuples every completed application window after failure. For this it saves 
> meta-data in a file per window. Having multiple small size files on hdfs 
> cause issues as highlighted here:
> http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
> Instead FS Window Data Manager can utilize the WAL to write data and maintain 
> a mapping of how much data was flushed to WAL each window. 
> In order to use FileSystemWAL for replaying data of a finished window, there 
> are few changes made to FileSystemWAL this is because of following:
> 1. WindowDataManager needs to reply data of every finished window. This 
> window may not be checkpointed. 
> FileSystemWAL truncates the WAL file to the checkpointed point after recovery 
> so this poses a problem. 
> WindowDataManager should be able to control recovery of FileSystemWAL.
> 2.  FileSystemWAL writes to temporary files. The mapping of temp files to 
> actual file is part of its state which is checkpointed. Since 
> WindowDataManager replays data of a window not yet checkpointed, it needs to 
> know the actual temporary file the data is being persisted to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to