[
https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203524#comment-15203524
]
ASF GitHub Bot commented on APEXMALHAR-1897:
--------------------------------------------
Github user amberarrow commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56771890
--- Diff:
library/src/main/java/com/datatorrent/lib/state/managed/BucketsFileSystem.java
---
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Persists bucket data on disk and maintains meta information about the
buckets.
+ * <p/>
+ *
+ * Each bucket has a meta-data file and the format of that is :<br/>
+ * <ol>
+ * <li>total number of time-buckets (int)</li>
+ * <li>For each time bucket
+ * <ol>
+ * <li>time bucket key (long)</li>
+ * <li>size of data (sum of bytes) (long)</li>
+ * <li>last transferred window id (long)</li>
+ * <li>length of the first key in the time-bucket file (int)</li>
+ * <li>first key in the time-bucket file (byte[])</li>
+ * </ol>
+ * </li>
+ * </ol>
+ * <p/>
+ * Meta data information is updated by {@link
IncrementalCheckpointManager}. Any updates are restricted to the package.
+ */
+public class BucketsFileSystem implements ManagedStateComponent
+{
+ public static final String META_FILE_NAME = "_META";
+
+ private final transient TreeBasedTable<Long, Long,
MutableTimeBucketMeta> timeBucketsMeta = TreeBasedTable.create();
+
+ //Check-pointed set of all buckets this instance has written to.
+ protected final Set<Long> bucketNamesOnFS = new
ConcurrentSkipListSet<>();
+
+ protected transient ManagedStateContext managedStateContext;
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ this.managedStateContext =
Preconditions.checkNotNull(managedStateContext, "managed state context");
+ }
+
+ protected FileAccess.FileWriter getWriter(long bucketId, String
fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getWriter(bucketId,
fileName);
+ }
+
+ protected FileAccess.FileReader getReader(long bucketId, String
fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getReader(bucketId,
fileName);
+ }
+
+ protected void rename(long bucketId, String fromName, String toName)
throws IOException
+ {
+ managedStateContext.getFileAccess().rename(bucketId, fromName, toName);
+ }
+
+ protected DataOutputStream getOutputStream(long bucketId, String
fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getOutputStream(bucketId,
fileName);
+ }
+
+ protected DataInputStream getInputStream(long bucketId, String fileName)
throws IOException
+ {
+ return managedStateContext.getFileAccess().getInputStream(bucketId,
fileName);
+ }
+
+ protected boolean exists(long bucketId, String fileName) throws
IOException
+ {
+ return managedStateContext.getFileAccess().exists(bucketId, fileName);
+ }
+
+ protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId)
throws IOException
+ {
+ return managedStateContext.getFileAccess().listFiles(bucketId);
+ }
+
+ protected void delete(long bucketId, String fileName) throws IOException
+ {
+ managedStateContext.getFileAccess().delete(bucketId, fileName);
+ }
+
+ protected void deleteBucket(long bucketId) throws IOException
+ {
+ managedStateContext.getFileAccess().deleteBucket(bucketId);
+ }
+
+ /**
+ * Saves data to a bucket. The data consists of key/values of all
time-buckets of a particular bucket.
+ *
+ * @param windowId window id
+ * @param bucketId bucket id
+ * @param data data of all time-buckets
+ * @throws IOException
+ */
+ protected void writeBucketData(long windowId, long bucketId, Map<Slice,
Bucket.BucketedValue> data) throws IOException
+ {
+ Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys =
TreeBasedTable.create(Ordering.<Long>natural(),
+ managedStateContext.getKeyComparator());
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
+ long timeBucketId = entry.getValue().getTimeBucket();
+ timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
+ }
+
+ for (long timeBucket : timeBucketedKeys.rowKeySet()) {
+ BucketsFileSystem.MutableTimeBucketMeta tbm =
getOrCreateTimeBucketMeta(bucketId, timeBucket);
+ addBucketName(bucketId);
+
+ long dataSize = 0;
+ Slice firstKey = null;
+
+ FileAccess.FileWriter fileWriter;
+ String tmpFileName = getTmpFileName();
+ if (tbm.getLastTransferredWindowId() == -1) {
+ //A new time bucket so we append all the key/values to the new file
+ fileWriter = getWriter(bucketId, tmpFileName);
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry :
timeBucketedKeys.row(timeBucket).entrySet()) {
+ Slice key = entry.getKey();
+ Slice value = entry.getValue().getValue();
+
+ dataSize += key.length;
+ dataSize += value.length;
+
+ fileWriter.append(key.toByteArray(), value.toByteArray());
+ if (firstKey == null) {
+ firstKey = key;
+ }
+ }
+ } else {
+ //the time bucket existed so we need to read the file and then
re-write it
+ TreeMap<Slice, Slice> fileData = new
TreeMap<>(managedStateContext.getKeyComparator());
+ FileAccess.FileReader fileReader = getReader(bucketId,
getFileName(timeBucket));
+ fileReader.readFully(fileData);
+ fileReader.close();
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry :
timeBucketedKeys.row(timeBucket).entrySet()) {
+ fileData.put(entry.getKey(), entry.getValue().getValue());
+ }
+
+ fileWriter = getWriter(bucketId, tmpFileName);
+ for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) {
+ Slice key = entry.getKey();
+ Slice value = entry.getValue();
+
+ dataSize += key.length;
+ dataSize += value.length;
+
+ fileWriter.append(key.toByteArray(), value.toByteArray());
+ if (firstKey == null) {
+ firstKey = key;
+ }
+ }
+ }
+ fileWriter.close();
+ rename(bucketId, tmpFileName, getFileName(timeBucket));
+ tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+ }
+
+ updateBucketMetaFile(bucketId);
+ }
+
+ /**
+ * Retrieves the time bucket meta of a particular time-bucket. If the
time bucket doesn't exist then a new one
+ * is created.
+ *
+ * @param bucketId bucket id
+ * @param timeBucketId time bucket id
+ * @return time bucket meta of the time bucket
+ * @throws IOException
+ */
+ @NotNull
+ MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long
timeBucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId,
timeBucketId);
+ if (tbm == null) {
+ tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+ timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+ }
+ return tbm;
+ }
+ }
+
+ protected void addBucketName(long bucketId)
+ {
+ bucketNamesOnFS.add(bucketId);
+ }
+
+ /**
+ * Returns the time bucket meta of a particular time-bucket which is
immutable.
+ *
+ * @param bucketId bucket id
+ * @param timeBucketId time bucket id
+ * @return immutable time bucket meta
+ * @throws IOException
+ */
+ @Nullable
+ public ImmutableTimeBucketMeta getTimeBucketMeta(long bucketId, long
timeBucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId,
timeBucketId);
--- End diff --
synchronized block could end here.
> Create ManagedState
> -------------------
>
> Key: APEXMALHAR-1897
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897
> Project: Apache Apex Malhar
> Issue Type: Sub-task
> Reporter: Chandni Singh
> Assignee: Chandni Singh
> Fix For: 3.4.0
>
>
> ManagedState is described in the document below:
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)