[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411570#comment-15411570
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:
--------------------------------------------

Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/330#discussion_r73844379
  
    --- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
    @@ -0,0 +1,363 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.managed;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.StreamCodec;
    +import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
    + *
    + * <b>Properties:</b><br>
    + * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple 
value or not. <br>
    + * <b>timeBucket</b>: Specifies the lenght of the time bucket.
    + *
    + */
    +@org.apache.hadoop.classification.InterfaceStability.Evolving
    +public class ManagedTimeStateMultiValue<K,V> implements 
Spillable.SpillableByteArrayListMultimap<K,V>
    +{
    +  private transient StreamCodec streamCodec = null;
    +  private boolean isKeyContainsMultiValue = false;
    +  private long timeBucket;
    +  @NotNull
    +  private ManagedTimeStateImpl store;
    +
    +  public ManagedTimeStateMultiValue()
    +  {
    +    if (streamCodec == null) {
    +      streamCodec = new KryoSerializableStreamCodec();
    +    }
    +  }
    +
    +  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isKeyContainsMultiValue)
    +  {
    +    this();
    +    this.store = Preconditions.checkNotNull(store);
    +    this.isKeyContainsMultiValue = isKeyContainsMultiValue;
    +  }
    +
    +  /**
    +   * Return the list of values from the store
    +   * @param k given key
    +   * @return list of values
    +   */
    +  @Override
    +  public List<V> get(@Nullable K k)
    +  {
    +    List<V> value = null;
    +    Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
    +    if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer 
== null) {
    +      return null;
    +    }
    +    if (isKeyContainsMultiValue) {
    +      return (List<V>)streamCodec.fromByteArray(valueSlice);
    +    }
    +    value = new ArrayList<>();
    +    value.add((V)streamCodec.fromByteArray(valueSlice));
    +    return  value;
    +  }
    +
    +  /**
    +   * Returns the Future form the store.
    +   * @param k given key
    +   * @return
    +   */
    +  public CompositeFuture getAsync(@Nullable K k)
    +  {
    +    return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
    +  }
    +
    +  @Override
    +  public Set<K> keySet()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Multiset<K> keys()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Collection<V> values()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Collection<Map.Entry<K, V>> entries()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public List<V> removeAll(@Nullable Object o)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void clear()
    +  {
    +
    +  }
    +
    +  @Override
    +  public int size()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean isEmpty()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean containsKey(@Nullable Object o)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean containsValue(@Nullable Object o)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean containsEntry(@Nullable Object o, @Nullable Object o1)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Inserts the (k,v) into the store.
    +   * @param k key
    +   * @param v value
    +   * @return true if the given (k,v) is successfully inserted into the 
store otherwise false.
    +   */
    +  @Override
    +  public boolean put(@Nullable K k, @Nullable V v)
    +  {
    +    if (isKeyContainsMultiValue) {
    +      Slice keySlice = streamCodec.toByteArray(k);
    +      int bucketId = getBucketId(k);
    +      Slice valueSlice = store.getSync(bucketId, keySlice);
    +      List<V> listOb;
    +      if (valueSlice == null || valueSlice.length == 0) {
    +        listOb = new ArrayList<>();
    +      } else {
    +        listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
    +      }
    +      listOb.add(v);
    +      return insertInStore(bucketId, timeBucket, keySlice, 
streamCodec.toByteArray(listOb));
    +    }
    +    return insertInStore(getBucketId(k), timeBucket, 
streamCodec.toByteArray(k),streamCodec.toByteArray(v));
    +  }
    +
    +  /**
    +   * Inserts the (k,v) into the store using the specified timebucket.
    +   * @param k key
    +   * @param v value
    +   * @param timeBucket timebucket
    +   * @return true if the given (k,v) is successfully inserted into the 
store otherwise false.
    +   */
    +  public boolean put(@Nullable K k, @Nullable V v, long timeBucket)
    +  {
    +    if (isKeyContainsMultiValue) {
    +      Slice keySlice = streamCodec.toByteArray(k);
    +      int bucketId = getBucketId(k);
    +      Slice valueSlice = store.getSync(bucketId, keySlice);
    +      List<V> listOb;
    +      if (valueSlice == null || valueSlice.length == 0) {
    +        listOb = new ArrayList<>();
    +      } else {
    +        listOb = (List<V>)streamCodec.fromByteArray(valueSlice);
    +      }
    +      listOb.add(v);
    +      return insertInStore(bucketId, timeBucket, keySlice, 
streamCodec.toByteArray(listOb));
    +    }
    +    return insertInStore(getBucketId(k), timeBucket, 
streamCodec.toByteArray(k),streamCodec.toByteArray(v));
    +  }
    +
    +  /**
    +   * Insert (keySlice,valueSlice) into the store using bucketId and 
timeBucket.
    +   * @param bucketId bucket Id
    +   * @param timeBucket time bucket
    +   * @param keySlice key slice
    +   * @param valueSlice value slice
    +   * @return true if the given (keySlice,valueSlice) is successfully 
inserted into the
    +   *         store otherwise false.
    +   */
    +  public boolean insertInStore(long bucketId, long timeBucket, Slice 
keySlice, Slice valueSlice)
    +  {
    +    long timeBucketId = 
store.getTimeBucketAssigner().getTimeBucketFor(timeBucket);
    +    if (timeBucketId != -1) {
    +      store.putInBucket(bucketId, timeBucketId, keySlice, valueSlice);
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public boolean remove(@Nullable Object o, @Nullable Object o1)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean putAll(@Nullable K k, Iterable<? extends V> iterable)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public List<V> replaceValues(K k, Iterable<? extends V> iterable)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Map<K, Collection<V>> asMap()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  public int getBucketId(K k)
    +  {
    +    return k.hashCode() % store.getNumBuckets();
    +  }
    +
    +
    +  public boolean isKeyContainsMultiValue()
    +  {
    +    return isKeyContainsMultiValue;
    +  }
    +
    +  public void setKeyContainsMultiValue(boolean keyContainsMultiValue)
    +  {
    +    isKeyContainsMultiValue = keyContainsMultiValue;
    +  }
    +
    +  public long getTimeBucket()
    +  {
    +    return timeBucket;
    +  }
    +
    +  public void setTimeBucket(long timeBucket)
    +  {
    +    this.timeBucket = timeBucket;
    +  }
    +
    +  public StreamCodec getStreamCodec()
    +  {
    +    return streamCodec;
    +  }
    +
    +  public void setStreamCodec(StreamCodec streamCodec)
    +  {
    +    this.streamCodec = streamCodec;
    +  }
    +
    +  public class CompositeFuture implements Future<List>
    +  {
    +    public Future<Slice> slice;
    +
    +    public CompositeFuture(Future<Slice> slice)
    +    {
    +      this.slice = slice;
    +    }
    +
    +    @Override
    +    public boolean cancel(boolean b)
    +    {
    +      return slice.cancel(b);
    +    }
    +
    +    @Override
    +    public boolean isCancelled()
    +    {
    +      return slice.isCancelled();
    +    }
    +
    +    @Override
    +    public boolean isDone()
    +    {
    +      return slice.isDone();
    +    }
    +
    +    /**
    +     * Converts the single element into the list.
    +     * @return
    --- End diff --
    
    Done


> Development of Inner Join Operator using Spillable Datastructures
> -----------------------------------------------------------------
>
>                 Key: APEXMALHAR-2100
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Chaitanya
>            Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to