Github user amberarrow commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56738629
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/state/managed/ManagedTimeUnifiedStateImpl.java
 ---
    @@ -0,0 +1,238 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.state.managed;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Queues;
    +import com.google.common.util.concurrent.Futures;
    +
    +import com.datatorrent.lib.fileaccess.FileAccess;
    +import com.datatorrent.lib.state.BucketedState;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * In this implementation of {@link ManagedState} the buckets in memory 
are time-buckets.
    + * <p/>
    + *
    + * <b>Difference from {@link ManagedTimeStateImpl}</b>: <br/>
    + * <ol>
    + * <li>The main buckets in {@link ManagedTimeStateImpl} are unique adhoc 
long ids which the user provides with the
    + * key. In this implementation the main buckets are time buckets. The user 
provides just the time and the time bucket is
    + * derived from it.
    + * </li>
    + * <br/>
    + *
    + * <li>In regards to the bucket data on disk, in {@link 
ManagedTimeStateImpl} the buckets are persisted on disk
    + * with each bucket data further grouped into time-buckets: 
{base_path}/{bucketId}/{time-bucket id}. <br/>
    + * In this implementation operator id is used as bucketId (on disk) and 
there is just one time-bucket under a
    + * particular operator id:
    + * {base_path}/{operator id}/{time bucket id}.
    + * </li>
    + * <br/>
    + *
    + * <li>In {@link ManagedTimeStateImpl} a bucket belongs to just one 
partition. Multiple partitions cannot write to
    + * the same bucket. <br/>
    + * In this implementation multiple partitions can be working with the same 
time-bucket (since time-bucket is derived
    + * from time). This works because on the disk the time-bucket data is 
segregated under each operator id.
    + * </li>
    + * <br/>
    + *
    + * <li>While {@link ManagedTimeStateImpl} can support dynamic partitioning 
by pre-allocating buckets this will not
    + * be able to support dynamic partitioning efficiently.
    + * </li>
    +
    + * </ol>
    + */
    +public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl 
implements BucketedState
    +{
    +  private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = 
Queues.newLinkedBlockingQueue();
    +
    +  public ManagedTimeUnifiedStateImpl()
    +  {
    +    bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
    +  }
    +
    +  @Override
    +  public int getNumBuckets()
    +  {
    +    return timeBucketAssigner.getNumBuckets();
    +  }
    +
    +  @Override
    +  public void put(long time, Slice key, Slice value)
    +  {
    +    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
    +    if (timeBucket == -1) {
    +      //time is expired so return null.
    +      return;
    +    }
    +    int bucketIdx = prepareBucket(timeBucket);
    +
    +    buckets[bucketIdx].put(key, timeBucket, value);
    +
    +  }
    +
    +  @Override
    +  public Slice getSync(long time, Slice key)
    +  {
    +    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
    +    if (timeBucket == -1) {
    +      //time is expired so return null.
    +      return null;
    +    }
    +    int bucketIdx = prepareBucket(timeBucket);
    +    return buckets[bucketIdx].get(key, timeBucket, Bucket.ReadSource.ALL);
    +  }
    +
    +  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    +  @Override
    +  public Future<Slice> getAsync(long time, Slice key)
    +  {
    +    long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
    +    if (timeBucket == -1) {
    +      //time is expired so return null.
    +      return null;
    +    }
    +    int bucketIdx = prepareBucket(timeBucket);
    +    Bucket bucket = buckets[bucketIdx];
    +    synchronized (bucket) {
    +      Slice cachedVal = buckets[bucketIdx].get(key, timeBucket, 
Bucket.ReadSource.MEMORY);
    +      if (cachedVal != null) {
    +        return Futures.immediateFuture(cachedVal);
    +      }
    +      return readerService.submit(new KeyFetchTask(bucket, key, 
timeBucket, throwable));
    +    }
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    +    super.endWindow();
    +    Long purgedTimeBucket;
    +
    +    //tear down all the purged time buckets
    +    while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
    +      int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
    +      if (buckets[purgedTimeBucketIdx] != null && 
buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
    +        buckets[purgedTimeBucketIdx].teardown();
    +        buckets[purgedTimeBucketIdx] = null;
    +      }
    +    }
    +  }
    +
    +  @Override
    +  protected void handleBucketConflict(int bucketIdx, long newBucketId)
    +  {
    +    Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < 
newBucketId, "new time bucket should have a value"
    +        + " greater than the old time bucket");
    +    //Time buckets are purged periodically so here a bucket conflict is 
expected and so we just ignore conflicts.
    +    buckets[bucketIdx].teardown();
    +    buckets[bucketIdx] = newBucket(newBucketId);
    +    buckets[bucketIdx].setup(this);
    +  }
    +
    +  @Override
    +  public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
    +  {
    +    purgedTimeBuckets.add(timeBucket);
    +    super.purgeTimeBucketsLessThanEqualTo(timeBucket);
    +  }
    +
    +  /**
    +   * This uses operator id instead of bucket id as the name of parent 
folder of time-buckets. This is because
    +   * multiple partitions may work on same time-buckets.
    +   */
    +  public static class TimeUnifiedBucketsFileSystem extends 
BucketsFileSystem
    --- End diff --
    
    Can this be private ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to