[
https://issues.apache.org/jira/browse/APEXMALHAR-1897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207624#comment-15207624
]
ASF GitHub Bot commented on APEXMALHAR-1897:
--------------------------------------------
Github user chandnisingh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r57094062
--- Diff:
library/src/main/java/com/datatorrent/lib/state/managed/ManagedTimeStateImpl.java
---
@@ -51,63 +51,57 @@ public void put(long bucketId, long time, Slice key,
Slice value)
}
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Slice getSync(long bucketId, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
-
- synchronized (bucket) {
- return bucket.get(key, -1, Bucket.ReadSource.ALL);
- }
+ return getValueFromBucketSync(bucketId, -1, key);
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Slice getSync(long bucketId, long time, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return BucketedState.EXPIRED;
}
+ return getValueFromBucketSync(bucketId, timeBucket, key);
+ }
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private Slice getValueFromBucketSync(long bucketId, long timeBucket,
Slice key)
+ {
+ int bucketIdx = prepareBucket(bucketId);
Bucket bucket = buckets[bucketIdx];
synchronized (bucket) {
return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
}
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Future<Slice> getAsync(long bucketId, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
- synchronized (bucket) {
- Slice cachedVal = buckets[bucketIdx].get(key, -1,
Bucket.ReadSource.MEMORY);
- if (cachedVal != null) {
- return Futures.immediateFuture(cachedVal);
- }
- return readerService.submit(new KeyFetchTask(bucket, key, -1,
throwable));
- }
+ return getValueFromBucketAsync(bucketId, -1, key);
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Future<Slice> getAsync(long bucketId, long time, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return Futures.immediateFuture(BucketedState.EXPIRED);
}
+ return getValueFromBucketAsync(bucketId, timeBucket, key);
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private Future<Slice> getValueFromBucketAsync(long bucketId, long
timeBucket, Slice key)
--- End diff --
I moved it to the AbstractManagedStateImpl however can't make it static
because they need to access some non-static members and methods - ```buckets```
array and ```prepareBucket()```
> Create ManagedState
> -------------------
>
> Key: APEXMALHAR-1897
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1897
> Project: Apache Apex Malhar
> Issue Type: Sub-task
> Reporter: Chandni Singh
> Assignee: Chandni Singh
> Fix For: 3.4.0
>
>
> ManagedState is described in the document below:
> https://docs.google.com/document/d/1gRWN9ufKSZSZD0N-pthlhpC9TZ8KwJ6hJlAX6nxl5f8/edit#heading=h.z87ti1fwyt0t
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)