Github user amberarrow commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r57086936
--- Diff:
library/src/main/java/com/datatorrent/lib/state/managed/ManagedTimeStateImpl.java
---
@@ -51,63 +51,57 @@ public void put(long bucketId, long time, Slice key,
Slice value)
}
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Slice getSync(long bucketId, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
-
- synchronized (bucket) {
- return bucket.get(key, -1, Bucket.ReadSource.ALL);
- }
+ return getValueFromBucketSync(bucketId, -1, key);
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Slice getSync(long bucketId, long time, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return BucketedState.EXPIRED;
}
+ return getValueFromBucketSync(bucketId, timeBucket, key);
+ }
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private Slice getValueFromBucketSync(long bucketId, long timeBucket,
Slice key)
+ {
+ int bucketIdx = prepareBucket(bucketId);
Bucket bucket = buckets[bucketIdx];
synchronized (bucket) {
return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
}
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Future<Slice> getAsync(long bucketId, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
- synchronized (bucket) {
- Slice cachedVal = buckets[bucketIdx].get(key, -1,
Bucket.ReadSource.MEMORY);
- if (cachedVal != null) {
- return Futures.immediateFuture(cachedVal);
- }
- return readerService.submit(new KeyFetchTask(bucket, key, -1,
throwable));
- }
+ return getValueFromBucketAsync(bucketId, -1, key);
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public Future<Slice> getAsync(long bucketId, long time, Slice key)
{
- int bucketIdx = prepareBucket(bucketId);
- Bucket bucket = buckets[bucketIdx];
long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
if (timeBucket == -1) {
//time is expired so no point in looking further.
return Futures.immediateFuture(BucketedState.EXPIRED);
}
+ return getValueFromBucketAsync(bucketId, timeBucket, key);
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private Future<Slice> getValueFromBucketAsync(long bucketId, long
timeBucket, Slice key)
--- End diff --
If these new functions getValueFromBucketAsync() and
getValueFromBucketSync() are moved to the base class and made static, could
they also be used in the other 2 implementation classes ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---