[
https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253202#comment-15253202
]
ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------
Github user chandnisingh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60683826
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.IOUtils;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.local.LocalFs;
+import org.apache.hadoop.fs.local.RawLocalFs;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A WAL implementation that is file based.
+ * <p/>
+ * Note:<br/>
+ * The FileSystem Writer and Reader operations should not alternate
because intermingling these operations will cause
+ * problems. Typically the WAL Reader will only used in recovery.<br/>
+ *
+ * Also this implementation is thread unsafe- the filesystem wal writer
and reader operations should be performed in
+ * operator's thread.
+ */
+public class FileSystemWAL implements
WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
+{
+
+ @NotNull
+ private String filePath;
+
+ //max length of the file
+ @Min(0)
+ private long maxLength;
+
+ private FileSystemWALPointer walStartPointer = new
FileSystemWALPointer(0, 0);
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new
FileSystemWALReader(this);
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new
FileSystemWALWriter(this);
+
+ //part => tmp file path;
+ private final Map<Integer, String> tempPartFiles = new TreeMap<>();
+
+ private long lastCheckpointedWindow = Stateless.WINDOW_ID;
+
+ @Override
+ public void setup()
+ {
+ try {
+ FileContext fileContext = FileContextUtils.getFileContext(filePath);
+ if (maxLength == 0) {
+ maxLength =
fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
+ }
+ fileSystemWALWriter.open(fileContext);
+ fileSystemWALReader.open(fileContext);
+ } catch (IOException e) {
+ throw new RuntimeException("during setup", e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long window)
+ {
+ try {
+ lastCheckpointedWindow = window;
+ fileSystemWALWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("during before cp", e);
+ }
+ }
+
+ @Override
+ public void committed(long window)
--- End diff --
Ok I will.
This is HDFS issue with the rename+overwrite option that I was referring to
https://issues.apache.org/jira/browse/HDFS-6757
It was advised to avoid rename + overwrite option on under construction
file.
Should I document all this as well here?
> Create a WAL in Malhar
> ----------------------
>
> Key: APEXMALHAR-1965
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chandni Singh
> Assignee: Chandni Singh
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead
> Logger. There have been some other places where we have created a different
> flavor of Write Ahead Logger.
> We need to find overlap between all these flavors and create a common Write
> Ahead Logger for use in Apex core and Apex malhar.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)