[
https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249282#comment-15249282
]
ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------
Github user tweise commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60352191
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.IOUtils;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.local.LocalFs;
+import org.apache.hadoop.fs.local.RawLocalFs;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.netlet.util.Slice;
+
+public class FileSystemWAL implements
WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
+{
+
+ @NotNull
+ private String filePath;
+
+ //max length of the file
+ @Min(0)
+ private long maxLength;
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new
FileSystemWALReader(this);
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new
FileSystemWALWriter(this);
+
+ //part => tmp file path;
+ private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new
ConcurrentSkipListMap<>();
+
+ private long lastCheckpointedWindow = Stateless.WINDOW_ID;
+
+ @Override
+ public void setup()
+ {
+ try {
+ FileContext fileContext = FileContextUtils.getFileContext(filePath);
+ if (maxLength == 0) {
+ maxLength =
fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
+ }
+ fileSystemWALWriter.open(fileContext);
+ fileSystemWALReader.open(fileContext);
+ } catch (IOException e) {
+ throw new RuntimeException("during setup", e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long window)
+ {
+ try {
+ lastCheckpointedWindow = window;
+ fileSystemWALWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("during before cp", e);
+ }
+ }
+
+ @Override
+ public void committed(long window)
+ {
+ try {
+ fileSystemWALWriter.finalizeFiles(window);
+ } catch (IOException e) {
+ throw new RuntimeException("during committed", e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ fileSystemWALReader.close();
+ fileSystemWALWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException("during teardown", e);
+ }
+ }
+
+ protected long getLastCheckpointedWindow()
+ {
+ return lastCheckpointedWindow;
+ }
+
+ protected String getPartFilePath(int partNumber)
+ {
+ return filePath + "_" + partNumber;
+ }
+
+ @Override
+ public FileSystemWALReader getReader()
+ {
+ return fileSystemWALReader;
+ }
+
+ /**
+ * Sets the File System WAL Reader. This can be used to override the
default wal reader.
+ *
+ * @param fileSystemWALReader wal reader.
+ */
+ public void setFileSystemWALReader(@NotNull FileSystemWALReader
fileSystemWALReader)
+ {
+ this.fileSystemWALReader =
Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
+ }
+
+ @Override
+ public FileSystemWALWriter getWriter()
+ {
+ return fileSystemWALWriter;
+ }
+
+ /**
+ * Sets the File System WAL Writer. This can be used to override the
default wal writer.
+ *
+ * @param fileSystemWALWriter wal writer.
+ */
+ public void setFileSystemWALWriter(@NotNull FileSystemWALWriter
fileSystemWALWriter)
+ {
+ this.fileSystemWALWriter =
Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
+ }
+
+ /**
+ * @return WAL file path
+ */
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ /**
+ * Sets the WAL file path.
+ *
+ * @param filePath WAL file path
+ */
+ public void setFilePath(@NotNull String filePath)
+ {
+ this.filePath = Preconditions.checkNotNull(filePath, "filePath");
+ }
+
+ /**
+ * @return max length of a WAL part file.
+ */
+ public long getMaxLength()
+ {
+ return maxLength;
+ }
+
+ /**
+ * Sets the maximum length of a WAL part file.
+ *
+ * @param maxLength max length of the WAL part file
+ */
+ public void setMaxLength(long maxLength)
+ {
+ this.maxLength = maxLength;
+ }
+
+ public static class FileSystemWALPointer implements
Comparable<FileSystemWALPointer>
+ {
+ private final int partNum;
+ private long offset;
+
+ private FileSystemWALPointer()
+ {
+ //for kryo
+ partNum = -1;
+ }
+
+ public FileSystemWALPointer(long offset)
+ {
+ this(0, offset);
+ }
+
+ public FileSystemWALPointer(int partNum, long offset)
+ {
+ this.partNum = partNum;
+ this.offset = offset;
+ }
+
+ @Override
+ public int compareTo(@NotNull FileSystemWALPointer o)
+ {
+ if (this.partNum < o.partNum) {
+ return -1;
+ }
+ if (this.partNum > o.partNum) {
+ return 1;
+ }
+ if (this.offset < o.offset) {
+ return -1;
+ }
+ if (this.offset > o.offset) {
+ return 1;
+ }
+ return 0;
+ }
+
+ public int getPartNum()
+ {
+ return partNum;
+ }
+
+ public long getOffset()
+ {
+ return offset;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset="
+ offset + '}';
+ }
+ }
+
+ /**
+ * A FileSystem Wal Reader
+ */
+ public static class FileSystemWALReader implements
WAL.WALReader<FileSystemWALPointer>
+ {
+ private FileSystemWALPointer currentPointer = new
FileSystemWALPointer(0, 0);
+
+ private transient DataInputStream inputStream;
+ private transient Path currentOpenPath;
+
+ private final FileSystemWAL fileSystemWAL;
+ private transient FileContext fileContext;
+
+ private FileSystemWALReader()
+ {
+ //for kryo
+ fileSystemWAL = null;
+ }
+
+ public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
+ {
+ this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal,
"wal");
+ }
+
+ protected void open(@NotNull FileContext fileContext) throws
IOException
+ {
+ this.fileContext = Preconditions.checkNotNull(fileContext,
"fileContext");
+ }
+
+ protected void close() throws IOException
+ {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ }
+
+ @Override
+ public void seek(FileSystemWALPointer pointer) throws IOException
+ {
+ if (inputStream != null) {
+ close();
+ }
+ inputStream = getInputStream(pointer);
+ Preconditions.checkNotNull(inputStream, "invalid pointer " +
pointer);
+ currentPointer = pointer;
+ }
+
+ /**
+ * Move to the next WAL segment.
+ *
+ * @return true if the next part file exists and is opened; false
otherwise.
+ * @throws IOException
+ */
+ private boolean nextSegment() throws IOException
+ {
+ if (inputStream != null) {
+ close();
+ }
+
+ currentPointer = new FileSystemWALPointer(currentPointer.partNum +
1, 0);
+ inputStream = getInputStream(currentPointer);
+
+ return inputStream != null;
+ }
+
+ private DataInputStream getInputStream(FileSystemWALPointer
walPointer) throws IOException
+ {
+ Preconditions.checkArgument(inputStream == null, "input stream not
null");
+ Path pathToReadFrom;
+ String tmpPath =
fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
+ if (tmpPath != null) {
+ pathToReadFrom = new Path(tmpPath);
+ } else {
+ pathToReadFrom = new
Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
+ }
+
+ LOG.debug("path to read {} and pointer {}", pathToReadFrom,
walPointer);
+ if (fileContext.util().exists(pathToReadFrom)) {
+ DataInputStream stream = fileContext.open(pathToReadFrom);
+ if (walPointer.offset > 0) {
+ stream.skip(walPointer.offset);
+ }
+ currentOpenPath = pathToReadFrom;
+ return stream;
+ }
+ return null;
+ }
+
+ @Override
+ public Slice next() throws IOException
+ {
+ do {
+ if (inputStream == null) {
+ inputStream = getInputStream(currentPointer);
+ }
+
+ if (inputStream != null &&
!fileContext.util().exists(currentOpenPath)) {
+ //if the tmp path was finalized the path may not exist any more
+ close();
+ inputStream = getInputStream(currentPointer);
+ }
+
+ if (inputStream != null && currentPointer.offset <
fileContext.getFileStatus(currentOpenPath).getLen()) {
+ int len = inputStream.readInt();
+ Preconditions.checkState(len >= 0, "negative length");
+
+ byte[] data = new byte[len];
+ inputStream.readFully(data);
+
+ currentPointer.offset += data.length + 4;
+ return new Slice(data);
+ }
+ } while (nextSegment());
+
+ return null;
+ }
+ }
+
+ /**
+ * A FileSystem WAL Writer.
+ */
+ public static class FileSystemWALWriter implements WAL.WALWriter
+ {
+ private FileSystemWALPointer currentPointer = new
FileSystemWALPointer(0, 0);
+ private transient DataOutputStream outputStream;
+
+ //windowId => Latest part which can be finalized.
+ private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
+
+ private final FileSystemWAL fileSystemWAL;
+ private transient FileContext fileContext;
+
+ private FileSystemWALWriter()
+ {
+ //for kryo
+ fileSystemWAL = null;
+ }
+
+ public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
+ {
+ this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal,
"wal");
+ }
+
+ protected void open(@NotNull FileContext fileContext) throws
IOException
+ {
+ this.fileContext = Preconditions.checkNotNull(fileContext, "file
context");
+ recover();
+ }
+
+ private void recover() throws IOException
+ {
+ LOG.debug("current point", currentPointer);
+ String tmpFilePath =
fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum());
+ if (tmpFilePath != null) {
+
+ Path tmpPath = new Path(tmpFilePath);
+ if (fileContext.util().exists(tmpPath)) {
+ LOG.debug("tmp path exists {}", tmpPath);
+
+ outputStream = getOutputStream(new
FileSystemWALPointer(currentPointer.partNum, 0));
+ DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
+
+ IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset,
outputStream);
+
+ outputStream.flush();
+ //remove old tmp
+ inputStreamOldTmp.close();
+ LOG.debug("delete tmp {}", tmpPath);
+ fileContext.delete(tmpPath, true);
+ }
+ }
+
+ //find all valid path names
+ Set<String> validPathNames = new HashSet<>();
+ for (Map.Entry<Integer, String> entry :
fileSystemWAL.tempPartFiles.entrySet()) {
+ if (entry.getKey() <= currentPointer.partNum) {
+ validPathNames.add(new Path(entry.getValue()).getName());
+ }
+ }
+ LOG.debug("valid names {}", validPathNames);
+
+ //there can be a failure just between the flush and the actual
checkpoint which can leave some stray tmp files
+ //which aren't accounted by tmp files map
+ Path walPath = new Path(fileSystemWAL.filePath);
+ Path parentWAL = walPath.getParent();
+ if (parentWAL != null && fileContext.util().exists(parentWAL)) {
+ RemoteIterator<FileStatus> remoteIterator =
fileContext.listStatus(parentWAL);
+ while (remoteIterator.hasNext()) {
+ FileStatus status = remoteIterator.next();
+ String fileName = status.getPath().getName();
+ if (fileName.startsWith(walPath.getName()) &&
fileName.endsWith(TMP_EXTENSION) &&
+ !validPathNames.contains(fileName)) {
+ LOG.debug("delete stray tmp {}", status.getPath());
+ fileContext.delete(status.getPath(), true);
+ }
+
+ }
+ }
+
+ }
+
+ protected void close() throws IOException
+ {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ LOG.debug("closed {}", currentPointer.partNum);
+ }
+ }
+
+ @Override
+ public int append(Slice entry) throws IOException
+ {
+ if (outputStream == null) {
+ outputStream = getOutputStream(currentPointer);
+ }
+
+ int entryLength = entry.length + 4;
+
+ // rotate if needed
+ if (shouldRotate(entryLength)) {
+ rotate(true);
+ }
+
+ outputStream.writeInt(entry.length);
+ outputStream.write(entry.toByteArray());
--- End diff --
Should use write(byte b[], int off, int len) instead
> Create a WAL in Malhar
> ----------------------
>
> Key: APEXMALHAR-1965
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chandni Singh
> Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead
> Logger. There have been some other places where we have created a different
> flavor of Write Ahead Logger.
> We need to find overlap between all these flavors and create a common Write
> Ahead Logger for use in Apex core and Apex malhar.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)