[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411570#comment-15411570 ]
ASF GitHub Bot commented on APEXMALHAR-2100: -------------------------------------------- Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73844379 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * <b>Properties:</b><br> + * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br> + * <b>timeBucket</b>: Specifies the lenght of the time bucket. + * + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { + if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); + } + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isKeyContainsMultiValue) + { + this(); + this.store = Preconditions.checkNotNull(store); + this.isKeyContainsMultiValue = isKeyContainsMultiValue; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List<V> get(@Nullable K k) + { + List<V> value = null; + Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { + return null; + } + if (isKeyContainsMultiValue) { + return (List<V>)streamCodec.fromByteArray(valueSlice); + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); + return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { + return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Multiset<K> keys() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> removeAll(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsKey(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(@Nullable Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object o, @Nullable Object o1) + { + throw new UnsupportedOperationException(); + } + + /** + * Inserts the (k,v) into the store. + * @param k key + * @param v value + * @return true if the given (k,v) is successfully inserted into the store otherwise false. + */ + @Override + public boolean put(@Nullable K k, @Nullable V v) + { + if (isKeyContainsMultiValue) { + Slice keySlice = streamCodec.toByteArray(k); + int bucketId = getBucketId(k); + Slice valueSlice = store.getSync(bucketId, keySlice); + List<V> listOb; + if (valueSlice == null || valueSlice.length == 0) { + listOb = new ArrayList<>(); + } else { + listOb = (List<V>)streamCodec.fromByteArray(valueSlice); + } + listOb.add(v); + return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb)); + } + return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v)); + } + + /** + * Inserts the (k,v) into the store using the specified timebucket. + * @param k key + * @param v value + * @param timeBucket timebucket + * @return true if the given (k,v) is successfully inserted into the store otherwise false. + */ + public boolean put(@Nullable K k, @Nullable V v, long timeBucket) + { + if (isKeyContainsMultiValue) { + Slice keySlice = streamCodec.toByteArray(k); + int bucketId = getBucketId(k); + Slice valueSlice = store.getSync(bucketId, keySlice); + List<V> listOb; + if (valueSlice == null || valueSlice.length == 0) { + listOb = new ArrayList<>(); + } else { + listOb = (List<V>)streamCodec.fromByteArray(valueSlice); + } + listOb.add(v); + return insertInStore(bucketId, timeBucket, keySlice, streamCodec.toByteArray(listOb)); + } + return insertInStore(getBucketId(k), timeBucket, streamCodec.toByteArray(k),streamCodec.toByteArray(v)); + } + + /** + * Insert (keySlice,valueSlice) into the store using bucketId and timeBucket. + * @param bucketId bucket Id + * @param timeBucket time bucket + * @param keySlice key slice + * @param valueSlice value slice + * @return true if the given (keySlice,valueSlice) is successfully inserted into the + * store otherwise false. + */ + public boolean insertInStore(long bucketId, long timeBucket, Slice keySlice, Slice valueSlice) + { + long timeBucketId = store.getTimeBucketAssigner().getTimeBucketFor(timeBucket); + if (timeBucketId != -1) { + store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice); + return true; + } + return false; + } + + @Override + public boolean remove(@Nullable Object o, @Nullable Object o1) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(@Nullable K k, Iterable<? extends V> iterable) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(Multimap<? extends K, ? extends V> multimap) + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> replaceValues(K k, Iterable<? extends V> iterable) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<K, Collection<V>> asMap() + { + throw new UnsupportedOperationException(); + } + + public int getBucketId(K k) + { + return k.hashCode() % store.getNumBuckets(); + } + + + public boolean isKeyContainsMultiValue() + { + return isKeyContainsMultiValue; + } + + public void setKeyContainsMultiValue(boolean keyContainsMultiValue) + { + isKeyContainsMultiValue = keyContainsMultiValue; + } + + public long getTimeBucket() + { + return timeBucket; + } + + public void setTimeBucket(long timeBucket) + { + this.timeBucket = timeBucket; + } + + public StreamCodec getStreamCodec() + { + return streamCodec; + } + + public void setStreamCodec(StreamCodec streamCodec) + { + this.streamCodec = streamCodec; + } + + public class CompositeFuture implements Future<List> + { + public Future<Slice> slice; + + public CompositeFuture(Future<Slice> slice) + { + this.slice = slice; + } + + @Override + public boolean cancel(boolean b) + { + return slice.cancel(b); + } + + @Override + public boolean isCancelled() + { + return slice.isCancelled(); + } + + @Override + public boolean isDone() + { + return slice.isDone(); + } + + /** + * Converts the single element into the list. + * @return --- End diff -- Done > Development of Inner Join Operator using Spillable Datastructures > ----------------------------------------------------------------- > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Chaitanya > Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)