[
https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248401#comment-15248401
]
ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------
Github user chandnisingh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60288368
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.IOUtils;
+import org.apache.apex.malhar.lib.utils.Serde;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.annotation.Stateless;
+
+public class FileSystemWAL<T> implements
WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
+{
+ @NotNull
+ private Serde<T, byte[]> serde;
+
+ @NotNull
+ private String filePath;
+
+ //max length of the file
+ @Min(0)
+ private long maxLength;
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALReader<T> fileSystemWALReader = new
FileSystemWALReader<>(this);
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALWriter<T> fileSystemWALWriter = new
FileSystemWALWriter<>(this);
+
+ private long lastCheckpointedWindow = Stateless.WINDOW_ID;
+
+ @Override
+ public void setup()
+ {
+ try {
+ FileContext fileContext = FileContextUtils.getFileContext(filePath);
+ if (maxLength == 0) {
+ maxLength =
fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
+ }
+ fileSystemWALReader.open(fileContext);
+ fileSystemWALWriter.open(fileContext);
+
+ } catch (IOException e) {
+ throw new RuntimeException("during setup", e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long window)
+ {
+ try {
+ lastCheckpointedWindow = window;
+ fileSystemWALWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("during before cp", e);
+ }
+ }
+
+ @Override
+ public void committed(long window)
+ {
+ try {
+ fileSystemWALWriter.finalizeFiles(window);
+ } catch (IOException e) {
+ throw new RuntimeException("during committed", e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ fileSystemWALReader.close();
+ fileSystemWALWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException("during teardown", e);
+ }
+ }
+
+ protected long getLastCheckpointedWindow()
+ {
+ return lastCheckpointedWindow;
+ }
+
+ protected String getPartFilePath(int partNumber)
+ {
+ return filePath + "_" + partNumber;
+ }
+
+ @Override
+ public FileSystemWALReader<T> getReader()
+ {
+ return fileSystemWALReader;
+ }
+
+ /**
+ * Sets the File System WAL Reader. This can be used to override the
default wal reader.
+ *
+ * @param fileSystemWALReader wal reader.
+ */
+ public void setFileSystemWALReader(@NotNull FileSystemWALReader<T>
fileSystemWALReader)
+ {
+ this.fileSystemWALReader =
Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
+ }
+
+ @Override
+ public FileSystemWALWriter<T> getWriter()
+ {
+ return fileSystemWALWriter;
+ }
+
+ /**
+ * Sets the File System WAL Writer. This can be used to override the
default wal writer.
+ * @param fileSystemWALWriter wal writer.
+ */
+ public void setFileSystemWALWriter(@NotNull FileSystemWALWriter<T>
fileSystemWALWriter)
+ {
+ this.fileSystemWALWriter =
Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
+ }
+
+ /**
+ * @return WAL Entry serde
+ */
+ public Serde<T, byte[]> getSerde()
+ {
+ return serde;
+ }
+
+ /**
+ * Sets the serde which is used for wal entry serialization and
de-serialization
+ *
+ * @param serde serializer/deserializer
+ */
+ public void setSerde(@NotNull Serde<T, byte[]> serde)
+ {
+ this.serde = Preconditions.checkNotNull(serde, "serde");
+ }
+
+ /**
+ * @return WAL file path
+ */
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ /**
+ * Sets the WAL file path.
+ *
+ * @param filePath WAL file path
+ */
+ public void setFilePath(@NotNull String filePath)
+ {
+ this.filePath = Preconditions.checkNotNull(filePath, "filePath");
+ }
+
+ /**
+ * @return max length of a WAL part file.
+ */
+ public long getMaxLength()
+ {
+ return maxLength;
+ }
+
+ /**
+ * Sets the maximum length of a WAL part file.
+ *
+ * @param maxLength max length of the WAL part file
+ */
+ public void setMaxLength(long maxLength)
+ {
+ this.maxLength = maxLength;
+ }
+
+ public static class FileSystemWALPointer implements
Comparable<FileSystemWALPointer>
+ {
+ private final int partNum;
+ private long offset;
+
+ private FileSystemWALPointer()
+ {
+ //for kryo
+ partNum = -1;
+ }
+
+ public FileSystemWALPointer(long offset)
+ {
+ this(0, offset);
+ }
+
+ public FileSystemWALPointer(int partNum, long offset)
+ {
+ this.partNum = partNum;
+ this.offset = offset;
+ }
+
+ @Override
+ public int compareTo(@NotNull FileSystemWALPointer o)
+ {
+ if (this.partNum < o.partNum) {
+ return -1;
+ }
+ if (this.partNum > o.partNum) {
+ return 1;
+ }
+ if (this.offset < o.offset) {
+ return -1;
+ }
+ if (this.offset > o.offset) {
+ return 1;
+ }
+ return 0;
+ }
+
+ public int getPartNum()
+ {
+ return partNum;
+ }
+
+ public long getOffset()
+ {
+ return offset;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset="
+ offset + '}';
+ }
+ }
+
+ /**
+ * A FileSystem Wal Reader
+ * @param <T> type of tuple.
+ */
+ public static class FileSystemWALReader<T> implements WAL.WALReader<T,
FileSystemWALPointer>
+ {
+ private T entry;
+ private FileSystemWALPointer currentPointer = new
FileSystemWALPointer(0, 0);
+
+ private transient DataInputStream inputStream;
+ private transient Path currentOpenPath;
+
+ private final FileSystemWAL<T> fileSystemWAL;
+ private transient FileContext fileContext;
+
+ private FileSystemWALReader()
+ {
+ //for kryo
+ fileSystemWAL = null;
+ }
+
+ public FileSystemWALReader(@NotNull FileSystemWAL<T> fileSystemWal)
+ {
+ this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal,
"wal");
+ }
+
+ protected void open(@NotNull FileContext fileContext) throws
IOException
+ {
+ this.fileContext = Preconditions.checkNotNull(fileContext,
"fileContext");
+ //initialize the input stream
+ inputStream = getInputStream(currentPointer);
+ }
+
+ protected void close() throws IOException
+ {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ }
+
+ @Override
+ public void seek(FileSystemWALPointer pointer) throws IOException
+ {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ inputStream = getInputStream(pointer);
+ Preconditions.checkNotNull(inputStream, "invalid pointer " +
pointer);
+ currentPointer = pointer;
+ }
+
+ /**
+ * Move to the next WAL segment.
+ *
+ * @return true if the next part file exists and is opened; false
otherwise.
+ * @throws IOException
+ */
+ private boolean nextSegment() throws IOException
+ {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+
+ currentPointer = new FileSystemWALPointer(currentPointer.partNum +
1, 0);
+ inputStream = getInputStream(currentPointer);
+
+ return inputStream != null;
+ }
+
+ private DataInputStream getInputStream(FileSystemWALPointer
walPointer) throws IOException
+ {
+ Path walPartPath = new
Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
+ if (fileContext.util().exists(walPartPath)) {
+ DataInputStream stream = fileContext.open(walPartPath);
+ if (walPointer.offset > 0) {
+ stream.skip(walPointer.offset);
+ }
+ currentOpenPath = walPartPath;
+ return stream;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean advance() throws IOException
+ {
+ do {
+ if (inputStream == null) {
+ inputStream = getInputStream(currentPointer);
+ }
+
+ if (inputStream != null &&
+ currentPointer.offset <
fileContext.getFileStatus(currentOpenPath).getLen()) {
+ int len = inputStream.readInt();
+ Preconditions.checkState(len >= 0, "negative length");
+
+ byte[] data = new byte[len];
+ inputStream.readFully(data);
+
+ entry = fileSystemWAL.serde.deserialize(data);
+ currentPointer.offset += data.length + 4;
+ return true;
+ }
+ } while (nextSegment());
+
+ entry = null;
+ return false;
+ }
+
+ @Override
+ public T get()
+ {
+ return entry;
+ }
+ }
+
+ /**
+ * A FileSystem WAL Writer.
+ * @param <T> type of tuple
+ */
+ public static class FileSystemWALWriter<T> implements WAL.WALWriter<T>
+ {
+ private FileSystemWALPointer currentPointer = new
FileSystemWALPointer(0, 0);
+ private transient DataOutputStream outputStream;
+
+ //windowId => Latest part which can be finalized.
+ private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
+
+ //part => tmp file path;
+ private final Map<Integer, String> tmpFiles = new TreeMap<>();
+
+ private final FileSystemWAL<T> fileSystemWAL;
+ private transient FileContext fileContext;
+
+ private FileSystemWALWriter()
+ {
+ //for kryo
+ fileSystemWAL = null;
+ }
+
+ public FileSystemWALWriter(@NotNull FileSystemWAL<T> fileSystemWal)
+ {
+ this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal,
"wal");
+ }
+
+ protected void open(@NotNull FileContext fileContext) throws
IOException
+ {
+ this.fileContext = Preconditions.checkNotNull(fileContext, "file
context");
+ recover();
+ if (outputStream == null) {
+ outputStream = getOutputStream(currentPointer);
+ }
+ }
+
+ private void recover() throws IOException
+ {
+ LOG.debug("current point", currentPointer);
+ String tmpFilePath = tmpFiles.get(currentPointer.getPartNum());
+ if (tmpFilePath != null) {
+
+ Path tmpPath = new Path(tmpFilePath);
+ if (fileContext.util().exists(tmpPath)) {
+ LOG.debug("tmp path exists {}", tmpPath);
+
+ outputStream = getOutputStream(currentPointer);
+ DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
+
+ IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset,
outputStream);
+
+ outputStream.flush();
+ //remove old tmp
+ inputStreamOldTmp.close();
+ LOG.debug("delete tmp {}", tmpPath);
+ fileContext.delete(tmpPath, true);
+ }
+ }
+
+ //find all valid path names
+ Set<String> validPathNames = new HashSet<>();
+ for (Map.Entry<Integer, String> entry : tmpFiles.entrySet()) {
+ if (entry.getKey() <= currentPointer.partNum) {
+ validPathNames.add(new Path(entry.getValue()).getName());
+ }
+ }
+ LOG.debug("valid names {}", validPathNames);
+
+ //there can be a failure just between the flush and the actual
checkpoint which can leave some stray tmp files
+ //which aren't accounted by tmp files map
+ Path walPath = new Path(fileSystemWAL.filePath);
+ Path parentWAL = walPath.getParent();
+ if (parentWAL != null && fileContext.util().exists(parentWAL)) {
+ RemoteIterator<FileStatus> remoteIterator =
fileContext.listStatus(parentWAL);
+ while (remoteIterator.hasNext()) {
+ FileStatus status = remoteIterator.next();
+ String fileName = status.getPath().getName();
+ if (fileName.startsWith(walPath.getName()) &&
fileName.endsWith(TMP_EXTENSION) &&
+ !validPathNames.contains(fileName)) {
+ LOG.debug("delete stray tmp {}", status.getPath());
+ fileContext.delete(status.getPath(), true);
+ }
+
+ }
+ }
+
+ }
+
+ protected void close() throws IOException
+ {
+ if (outputStream != null) {
+ flush();
+ outputStream.close();
+ outputStream = null;
+ LOG.debug("closed {}", currentPointer.partNum);
+ }
+ }
+
+ @Override
+ public int append(T entry) throws IOException
+ {
+ byte[] slice = fileSystemWAL.serde.serialize(entry);
+ int entryLength = slice.length + 4;
+
+ // rotate if needed
+ if (shouldRotate(entryLength)) {
+ rotate(true);
+ }
+
+ outputStream.writeInt(slice.length);
+ outputStream.write(slice);
+ currentPointer.offset += entryLength;
+
+ if (currentPointer.offset == fileSystemWAL.maxLength) {
+ //if the file is completed then we can rotate it. do not have to
wait for next entry
+ rotate(false);
+ }
+
+ return entryLength;
+ }
+
+ protected void flush() throws IOException
+ {
+ if (outputStream != null) {
+ outputStream.flush();
+ if (outputStream instanceof FSDataOutputStream) {
+ ((FSDataOutputStream)outputStream).hflush();
--- End diff --
We do not need both ```outputStream.flush()``` and
```outputStream.hfush()```.
I will change this to
```
protected void flush() throws IOException
{
if (outputStream != null) {
if (outputStream instanceof Syncable) {
Syncable syncableOutputStream = (Syncable)outputStream;
syncableOutputStream.hflush();
syncableOutputStream.hsync();
} else {
outputStream.flush();
}
}
}
```
> Create a WAL in Malhar
> ----------------------
>
> Key: APEXMALHAR-1965
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chandni Singh
> Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead
> Logger. There have been some other places where we have created a different
> flavor of Write Ahead Logger.
> We need to find overlap between all these flavors and create a common Write
> Ahead Logger for use in Apex core and Apex malhar.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)