[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207624#comment-15207624
 ] 

ASF GitHub Bot commented on APEXMALHAR-1897:
--------------------------------------------

Github user chandnisingh commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r57094062
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/state/managed/ManagedTimeStateImpl.java
 ---
    @@ -51,63 +51,57 @@ public void put(long bucketId, long time, Slice key, 
Slice value)
         }
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Slice getSync(long bucketId, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
    -
    -    synchronized (bucket) {
    -      return bucket.get(key, -1, Bucket.ReadSource.ALL);
    -    }
    +    return getValueFromBucketSync(bucketId, -1, key);
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Slice getSync(long bucketId, long time, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
         long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
         if (timeBucket == -1) {
           //time is expired so no point in looking further.
           return BucketedState.EXPIRED;
         }
    +    return getValueFromBucketSync(bucketId, timeBucket, key);
    +  }
     
    +  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    +  private Slice getValueFromBucketSync(long bucketId, long timeBucket, 
Slice key)
    +  {
    +    int bucketIdx = prepareBucket(bucketId);
         Bucket bucket = buckets[bucketIdx];
         synchronized (bucket) {
           return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
         }
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Future<Slice> getAsync(long bucketId, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
    -    synchronized (bucket) {
    -      Slice cachedVal = buckets[bucketIdx].get(key, -1, 
Bucket.ReadSource.MEMORY);
    -      if (cachedVal != null) {
    -        return Futures.immediateFuture(cachedVal);
    -      }
    -      return readerService.submit(new KeyFetchTask(bucket, key, -1, 
throwable));
    -    }
    +    return getValueFromBucketAsync(bucketId, -1, key);
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Future<Slice> getAsync(long bucketId, long time, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
         long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
         if (timeBucket == -1) {
           //time is expired so no point in looking further.
           return Futures.immediateFuture(BucketedState.EXPIRED);
         }
    +    return getValueFromBucketAsync(bucketId, timeBucket, key);
    +  }
    +
    +  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    +  private Future<Slice> getValueFromBucketAsync(long bucketId, long 
timeBucket, Slice key)
    --- End diff --
    
    I moved it to the AbstractManagedStateImpl however can't make it static 
because they need to access some non-static members and methods - ```buckets``` 
array and ```prepareBucket()```


> Create ManagedState
> -------------------
>
>                 Key: APEXMALHAR-1897
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897
>             Project: Apache Apex Malhar
>          Issue Type: Sub-task
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>             Fix For: 3.4.0
>
>
> ManagedState is described in the document below:
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to