Any motivation for using inifinispan instead of hazel cast?

Tim

Sent from my iPhone

> On May 21, 2014, at 6:14 PM, [email protected] wrote:
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
> new file mode 100644
> index 0000000..46d4eca
> --- /dev/null
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
> @@ -0,0 +1,66 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.cache.infinispan;
> +
> +import org.infinispan.marshall.core.MarshalledEntry;
> +import org.infinispan.persistence.spi.ExternalStore;
> +import org.infinispan.persistence.spi.InitializationContext;
> +
> +/**
> + * Stores the cached objects in zookeeper.  Objects are stored in 
> /start/cache_name/key_name = data
> + * @param <K>
> + * @param <V>
> + */
> +public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class);
> +
> +  private String cacheName;
> +
> +  @Override
> +  public void init(InitializationContext ctx) {
> +    ctx.getConfiguration();
> +
> +  }
> +
> +  @Override
> +  public MarshalledEntry<K, V> load(K key) {
> +    return null;
> +  }
> +
> +  @Override
> +  public boolean contains(K key) {
> +    return false;
> +  }
> +
> +  @Override
> +  public void start() {
> +  }
> +
> +  @Override
> +  public void stop() {
> +  }
> +
> +  @Override
> +  public void write(MarshalledEntry<K, V> entry) {
> +  }
> +
> +  @Override
> +  public boolean delete(K key) {
> +    return false;
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
> new file mode 100644
> index 0000000..e66cc90
> --- /dev/null
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
> @@ -0,0 +1,309 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.cache.local;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.lang.reflect.InvocationTargetException;
> +import java.util.Collection;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +import java.util.concurrent.ConcurrentMap;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.atomic.AtomicLong;
> +
> +import org.apache.drill.common.util.DataInputInputStream;
> +import org.apache.drill.common.util.DataOutputOutputStream;
> +import org.apache.drill.exec.cache.Counter;
> +import org.apache.drill.exec.cache.DistributedCache;
> +import org.apache.drill.exec.cache.DistributedMap;
> +import org.apache.drill.exec.cache.DistributedMultiMap;
> +import org.apache.drill.exec.cache.DrillSerializable;
> +import org.apache.drill.exec.exception.DrillbitStartupException;
> +import org.apache.drill.exec.memory.BufferAllocator;
> +import org.apache.drill.exec.memory.TopLevelAllocator;
> +import org.apache.drill.exec.proto.BitControl.PlanFragment;
> +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
> +
> +import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.google.common.collect.ArrayListMultimap;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Maps;
> +import com.google.common.io.ByteArrayDataInput;
> +import com.google.common.io.ByteArrayDataOutput;
> +import com.google.common.io.ByteStreams;
> +
> +public class LocalCache implements DistributedCache {
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(LocalCache.class);
> +
> +  private volatile Map<FragmentHandle, PlanFragment> handles;
> +  private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
> +  private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
> +  private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
> +  private volatile ConcurrentMap<String, Counter> counters;
> +  private static final BufferAllocator allocator = new TopLevelAllocator();
> +
> +  private static final ObjectMapper mapper = 
> DrillConfig.create().getMapper();
> +
> +  @Override
> +  public void close() throws IOException {
> +    handles = null;
> +  }
> +
> +  @Override
> +  public void run() throws DrillbitStartupException {
> +    handles = Maps.newConcurrentMap();
> +    maps = Maps.newConcurrentMap();
> +    multiMaps = Maps.newConcurrentMap();
> +    counters = Maps.newConcurrentMap();
> +    namedMaps = Maps.newConcurrentMap();
> +  }
> +
> +  @Override
> +  public PlanFragment getFragment(FragmentHandle handle) {
> +//    logger.debug("looking for fragment with handle: {}", handle);
> +    return handles.get(handle);
> +  }
> +
> +  @Override
> +  public void storeFragment(PlanFragment fragment) {
> +//    logger.debug("Storing fragment: {}", fragment);
> +    handles.put(fragment.getHandle(), fragment);
> +  }
> +
> +  @Override
> +  public <V extends DrillSerializable> DistributedMultiMap<V> 
> getMultiMap(Class<V> clazz) {
> +    DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) 
> multiMaps.get(clazz);
> +    if (mmap == null) {
> +      multiMaps.putIfAbsent(clazz, new 
> LocalDistributedMultiMapImpl<V>(clazz));
> +      return (DistributedMultiMap<V>) multiMaps.get(clazz);
> +    } else {
> +      return mmap;
> +    }
> +  }
> +
> +  @Override
> +  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> 
> clazz) {
> +    DistributedMap m = maps.get(clazz);
> +    if (m == null) {
> +      maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
> +      return (DistributedMap<V>) maps.get(clazz);
> +    } else {
> +      return m;
> +    }
> +  }
> +
> +
> +  @Override
> +  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String 
> name, Class<V> clazz) {
> +    DistributedMap m = namedMaps.get(clazz);
> +    if (m == null) {
> +      namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
> +      return (DistributedMap<V>) namedMaps.get(name);
> +    } else {
> +      return m;
> +    }
> +  }
> +
> +  @Override
> +  public Counter getCounter(String name) {
> +    Counter c = counters.get(name);
> +    if (c == null) {
> +      counters.putIfAbsent(name, new LocalCounterImpl());
> +      return counters.get(name);
> +    } else {
> +      return c;
> +    }
> +  }
> +
> +  public static ByteArrayDataOutput serialize(DrillSerializable obj) {
> +    if(obj instanceof JacksonSerializable){
> +      try{
> +        ByteArrayDataOutput out = ByteStreams.newDataOutput();
> +        out.write(mapper.writeValueAsBytes(obj));
> +        return out;
> +      }catch(Exception e){
> +        throw new RuntimeException(e);
> +      }
> +    }
> +
> +    ByteArrayDataOutput out = ByteStreams.newDataOutput();
> +    OutputStream outputStream = 
> DataOutputOutputStream.constructOutputStream(out);
> +    try {
> +      obj.writeToStream(outputStream);
> +    } catch (IOException e) {
> +      throw new RuntimeException(e);
> +    }
> +    try {
> +      outputStream.flush();
> +    } catch (IOException e) {
> +      throw new RuntimeException(e);
> +    }
> +    return out;
> +  }
> +
> +  public static <V extends DrillSerializable> V deserialize(byte[] bytes, 
> Class<V> clazz) {
> +    if(JacksonSerializable.class.isAssignableFrom(clazz)){
> +      try{
> +        return (V) mapper.readValue(bytes, clazz);
> +      }catch(Exception e){
> +        throw new RuntimeException(e);
> +      }
> +    }
> +
> +    ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
> +    InputStream inputStream = DataInputInputStream.constructInputStream(in);
> +    try {
> +      V obj = 
> clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
> +      obj.readFromStream(inputStream);
> +      return obj;
> +    } catch (InstantiationException | IllegalAccessException | IOException | 
> NoSuchMethodException | InvocationTargetException e) {
> +      throw new RuntimeException(e);
> +    }
> +  }
> +
> +  public static class LocalDistributedMultiMapImpl<V extends 
> DrillSerializable> implements DistributedMultiMap<V> {
> +    private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
> +    private Class<V> clazz;
> +
> +    public LocalDistributedMultiMapImpl(Class<V> clazz) {
> +      mmap = ArrayListMultimap.create();
> +      this.clazz = clazz;
> +    }
> +
> +    @Override
> +    public Collection<V> get(String key) {
> +      List<V> list = Lists.newArrayList();
> +      for (ByteArrayDataOutput o : mmap.get(key)) {
> +        list.add(deserialize(o.toByteArray(), this.clazz));
> +      }
> +      return list;
> +    }
> +
> +    @Override
> +    public void put(String key, DrillSerializable value) {
> +      mmap.put(key, serialize(value));
> +    }
> +  }
> +
> +  public static class LocalDistributedMapImpl<V extends DrillSerializable> 
> implements DistributedMap<V> {
> +    protected ConcurrentMap<String, ByteArrayDataOutput> m;
> +    protected Class<V> clazz;
> +
> +    public LocalDistributedMapImpl(Class<V> clazz) {
> +      m = Maps.newConcurrentMap();
> +      this.clazz = clazz;
> +    }
> +
> +    @Override
> +    public V get(String key) {
> +      if (m.get(key) == null) return null;
> +      ByteArrayDataOutput b = m.get(key);
> +      byte[] bytes = b.toByteArray();
> +      return (V) deserialize(m.get(key).toByteArray(), this.clazz);
> +    }
> +
> +    @Override
> +    public void put(String key, V value) {
> +      m.put(key, serialize(value));
> +    }
> +
> +    @Override
> +    public void putIfAbsent(String key, V value) {
> +      m.putIfAbsent(key, serialize(value));
> +    }
> +
> +    @Override
> +    public void putIfAbsent(String key, V value, long ttl, TimeUnit 
> timeUnit) {
> +      m.putIfAbsent(key, serialize(value));
> +      logger.warn("Expiration not implemented in local map cache");
> +    }
> +
> +    private class DeserializingTransformer implements 
> Iterator<Map.Entry<String, V> >{
> +      private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
> +
> +      public DeserializingTransformer(Iterator<Entry<String, 
> ByteArrayDataOutput>> inner) {
> +        super();
> +        this.inner = inner;
> +      }
> +
> +      @Override
> +      public boolean hasNext() {
> +        return inner.hasNext();
> +      }
> +
> +      @Override
> +      public Entry<String, V> next() {
> +        return newEntry(inner.next());
> +      }
> +
> +      @Override
> +      public void remove() {
> +        throw new UnsupportedOperationException();
> +      }
> +
> +      public Entry<String, V> newEntry(final Entry<String, 
> ByteArrayDataOutput> input) {
> +        return new Map.Entry<String, V>(){
> +
> +          @Override
> +          public String getKey() {
> +            return input.getKey();
> +          }
> +
> +          @Override
> +          public V getValue() {
> +            return deserialize(input.getValue().toByteArray(), clazz);
> +          }
> +
> +          @Override
> +          public V setValue(V value) {
> +            throw new UnsupportedOperationException();
> +          }
> +
> +        };
> +      }
> +
> +    }
> +    @Override
> +    public Iterator<Entry<String, V>> iterator() {
> +      return new DeserializingTransformer(m.entrySet().iterator());
> +    }
> +  }
> +
> +  public static class LocalCounterImpl implements Counter {
> +    private AtomicLong al = new AtomicLong();
> +
> +    @Override
> +    public long get() {
> +      return al.get();
> +    }
> +
> +    @Override
> +    public long incrementAndGet() {
> +      return al.incrementAndGet();
> +    }
> +
> +    @Override
> +    public long decrementAndGet() {
> +      return al.decrementAndGet();
> +    }
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
> index f105363..a0c439e 100644
> --- 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
> @@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit;
> 
> import org.apache.drill.common.expression.ErrorCollector;
> import org.apache.drill.common.expression.ErrorCollectorImpl;
> -import org.apache.drill.common.expression.ExpressionPosition;
> import org.apache.drill.common.expression.FieldReference;
> import org.apache.drill.common.expression.LogicalExpression;
> import org.apache.drill.common.expression.SchemaPath;
> import org.apache.drill.common.logical.data.Order.Ordering;
> import org.apache.drill.common.types.TypeProtos;
> import org.apache.drill.common.types.Types;
> +import org.apache.drill.exec.cache.CachedVectorContainer;
> import org.apache.drill.exec.cache.Counter;
> import org.apache.drill.exec.cache.DistributedCache;
> import org.apache.drill.exec.cache.DistributedMap;
> @@ -115,9 +115,9 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
>   private int recordCount;
> 
>   private final IntVector partitionKeyVector;
> -  private final DistributedMap<VectorAccessibleSerializable> tableMap;
> +  private final DistributedMap<CachedVectorContainer> tableMap;
>   private final Counter minorFragmentSampleCount;
> -  private final DistributedMultiMap<VectorAccessibleSerializable> mmap;
> +  private final DistributedMultiMap<CachedVectorContainer> mmap;
>   private final String mapKey;
>   private List<VectorContainer> sampledIncomingBatches;
> 
> @@ -131,8 +131,8 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
>     this.completionFactor = pop.getCompletionFactor();
> 
>     DistributedCache cache = context.getDrillbitContext().getCache();
> -    this.mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
> -    this.tableMap = cache.getMap(VectorAccessibleSerializable.class);
> +    this.mmap = cache.getMultiMap(CachedVectorContainer.class);
> +    this.tableMap = cache.getMap(CachedVectorContainer.class);
>     Preconditions.checkNotNull(tableMap);
> 
>     this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), 
> context.getHandle().getMajorFragmentId());
> @@ -220,7 +220,7 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
>     // into a serializable wrapper object, and then add to distributed map
> 
>     WritableBatch batch = 
> WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), 
> containerToCache, false);
> -    VectorAccessibleSerializable sampleToSave = new 
> VectorAccessibleSerializable(batch, oContext.getAllocator());
> +    CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, 
> context.getAllocator());
> 
>     mmap.put(mapKey, sampleToSave);
>     this.sampledIncomingBatches = builder.getHeldRecordBatches();
> @@ -251,7 +251,7 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
>         return false;
>       }
> 
> -      VectorAccessibleSerializable finalTable = null;
> +      CachedVectorContainer finalTable = null;
> 
>       long val = minorFragmentSampleCount.incrementAndGet();
>       logger.debug("Incremented mfsc, got {}", val);
> @@ -301,8 +301,8 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
> 
>     // Get all samples from distributed map
> 
> -    SortRecordBatchBuilder containerBuilder = new 
> SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
> -    for (VectorAccessibleSerializable w : mmap.get(mapKey)) {
> +    SortRecordBatchBuilder containerBuilder = new 
> SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
> +    for (CachedVectorContainer w : mmap.get(mapKey)) {
>       containerBuilder.add(w.get());
>     }
>     VectorContainer allSamplesContainer = new VectorContainer();
> @@ -346,7 +346,7 @@ public class OrderedPartitionRecordBatch extends 
> AbstractRecordBatch<OrderedPart
>     }
>     candidatePartitionTable.setRecordCount(copier.getOutputRecords());
>     WritableBatch batch = 
> WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), 
> candidatePartitionTable, false);
> -    VectorAccessibleSerializable wrap = new 
> VectorAccessibleSerializable(batch, 
> context.getDrillbitContext().getAllocator());
> +    CachedVectorContainer wrap = new CachedVectorContainer(batch, 
> context.getDrillbitContext().getAllocator());
>     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
> 
>     candidatePartitionTable.clear();
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
> index 7297dc3..0e3181d 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
> @@ -22,7 +22,7 @@ import java.io.Closeable;
> import org.apache.drill.common.config.DrillConfig;
> import org.apache.drill.exec.ExecConstants;
> import org.apache.drill.exec.cache.DistributedCache;
> -import org.apache.drill.exec.cache.HazelCache;
> +import org.apache.drill.exec.cache.hazel.HazelCache;
> import org.apache.drill.exec.coord.ClusterCoordinator;
> import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
> import org.apache.drill.exec.coord.ZKClusterCoordinator;
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
> index c0b82bd..2078107 100644
> --- 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
> @@ -21,17 +21,17 @@ import java.io.Closeable;
> import java.io.IOException;
> 
> import org.apache.drill.exec.cache.DistributedCache;
> -import org.apache.drill.exec.cache.LocalCache;
> +import org.apache.drill.exec.cache.local.LocalCache;
> import org.apache.drill.exec.coord.ClusterCoordinator;
> import org.apache.drill.exec.coord.LocalClusterCoordinator;
> import org.apache.drill.exec.exception.DrillbitStartupException;
> 
> public class RemoteServiceSet implements Closeable{
>   static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
> -  
> +
>   private final DistributedCache cache;
>   private final ClusterCoordinator coordinator;
> -  
> +
>   public RemoteServiceSet(DistributedCache cache, ClusterCoordinator 
> coordinator) {
>     super();
>     this.cache = cache;
> @@ -46,16 +46,21 @@ public class RemoteServiceSet implements Closeable{
>   public ClusterCoordinator getCoordinator() {
>     return coordinator;
>   }
> -  
> -  
> +
> +
>   @Override
>   public void close() throws IOException {
> +    try{
>     cache.close();
> +    }catch(Exception e){
> +      if(e instanceof IOException) throw (IOException) e;
> +      throw new IOException("Failure while closing cache", e);
> +    }
>     coordinator.close();
>   }
> 
>   public static RemoteServiceSet getLocalServiceSet(){
>     return new RemoteServiceSet(new LocalCache(), new 
> LocalClusterCoordinator());
>   }
> -  
> +
> }
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java 
> b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> index 99e712b..20722d9 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> @@ -29,7 +29,7 @@ import org.apache.drill.common.config.DrillConfig;
> import org.apache.drill.common.util.TestTools;
> import org.apache.drill.exec.ExecTest;
> import org.apache.drill.exec.cache.DistributedCache;
> -import org.apache.drill.exec.cache.LocalCache;
> +import org.apache.drill.exec.cache.local.LocalCache;
> import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
> import org.apache.drill.exec.memory.TopLevelAllocator;
> import org.apache.drill.exec.ops.QueryContext;
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
> new file mode 100644
> index 0000000..13322f1
> --- /dev/null
> +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
> @@ -0,0 +1,94 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.cache;
> +
> +import java.io.DataInput;
> +import java.io.DataInputStream;
> +import java.io.DataOutput;
> +import java.io.DataOutputStream;
> +import java.io.Externalizable;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.ObjectInput;
> +import java.io.ObjectOutput;
> +import java.io.OutputStream;
> +import java.util.List;
> +
> +import org.infinispan.Cache;
> +import org.infinispan.configuration.cache.CacheMode;
> +import org.infinispan.configuration.cache.Configuration;
> +import org.infinispan.configuration.cache.ConfigurationBuilder;
> +import org.infinispan.configuration.global.GlobalConfiguration;
> +import org.infinispan.configuration.global.GlobalConfigurationBuilder;
> +import org.infinispan.manager.DefaultCacheManager;
> +import org.infinispan.manager.EmbeddedCacheManager;
> +
> +import com.google.hive12.common.collect.Lists;
> +
> +public class ISpan {
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(ISpan.class);
> +
> +
> +  public static void main(String[] args) throws Exception{
> +    GlobalConfiguration gc = new 
> GlobalConfigurationBuilder().transport().defaultTransport().build();
> +    Configuration c = new ConfigurationBuilder() //
> +    .clustering().cacheMode(CacheMode.DIST_ASYNC) //
> +    .storeAsBinary()
> +    .build();
> +    EmbeddedCacheManager ecm = new DefaultCacheManager(gc, c);
> +
> +    Cache<String, List<XT>> cache = ecm.getCache();
> +    List<XT> items = Lists.newArrayList();
> +    items.add(new XT(1));
> +    items.add(new XT(2));
> +
> +    cache.put("items", items);
> +    for(XT x : cache.get("items")){
> +      System.out.println(x.i);
> +    }
> +
> +
> +  }
> +
> +  private static class XT extends AbstractDataSerializable{
> +
> +    int i =0;
> +
> +
> +    public XT(int i) {
> +      super();
> +      this.i = i;
> +    }
> +
> +    @Override
> +    public void read(DataInput input) throws IOException {
> +      i = input.readInt();
> +    }
> +
> +    @Override
> +    public void write(DataOutput output) throws IOException {
> +      output.writeInt(i);
> +    }
> +
> +    @Override
> +    public String toString() {
> +      return "XT [i=" + i + "]";
> +    }
> +
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
> index a3d39a3..7686614 100644
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
> @@ -17,7 +17,7 @@
>  */
> package org.apache.drill.exec.cache;
> 
> -import com.beust.jcommander.internal.Lists;
> +import java.util.List;
> 
> import org.apache.drill.common.config.DrillConfig;
> import org.apache.drill.common.expression.ExpressionPosition;
> @@ -25,67 +25,105 @@ import org.apache.drill.common.expression.SchemaPath;
> import org.apache.drill.common.types.TypeProtos;
> import org.apache.drill.common.types.Types;
> import org.apache.drill.exec.ExecTest;
> +import org.apache.drill.exec.cache.hazel.HazelCache;
> +import org.apache.drill.exec.cache.infinispan.ICache;
> import org.apache.drill.exec.expr.TypeHelper;
> -import org.apache.drill.exec.record.*;
> +import org.apache.drill.exec.memory.TopLevelAllocator;
> +import org.apache.drill.exec.record.MaterializedField;
> +import org.apache.drill.exec.record.VectorAccessible;
> +import org.apache.drill.exec.record.VectorContainer;
> +import org.apache.drill.exec.record.VectorWrapper;
> +import org.apache.drill.exec.record.WritableBatch;
> import org.apache.drill.exec.server.Drillbit;
> import org.apache.drill.exec.server.DrillbitContext;
> import org.apache.drill.exec.server.RemoteServiceSet;
> -import org.apache.drill.exec.vector.*;
> +import org.apache.drill.exec.vector.AllocationHelper;
> +import org.apache.drill.exec.vector.IntVector;
> +import org.apache.drill.exec.vector.ValueVector;
> +import org.apache.drill.exec.vector.VarBinaryVector;
> import org.junit.Test;
> 
> -import java.util.List;
> +import com.beust.jcommander.internal.Lists;
> 
> -public class TestVectorCache  extends ExecTest{
> +public class TestVectorCache extends ExecTest{
> 
> -  @Test
> -  public void testVectorCache() throws Exception {
> +  private void testCache(DrillConfig config, DistributedCache dcache) throws 
> Exception {
>     List<ValueVector> vectorList = Lists.newArrayList();
>     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
> -    DrillConfig config = DrillConfig.create();
> -    Drillbit bit = new Drillbit(config, serviceSet);
> -    bit.run();
> -    DrillbitContext context = bit.getContext();
> -    HazelCache cache = new HazelCache(config, context.getAllocator());
> -    cache.run();
> 
> -    MaterializedField intField = 
> MaterializedField.create(SchemaPath.getSimplePath("int"), 
> Types.required(TypeProtos.MinorType.INT));
> -    IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, 
> context.getAllocator());
> -    MaterializedField binField = MaterializedField.create(new 
> SchemaPath("binary", ExpressionPosition.UNKNOWN), 
> Types.required(TypeProtos.MinorType.VARBINARY));
> -    VarBinaryVector binVector = 
> (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
> -    AllocationHelper.allocate(intVector, 4, 4);
> -    AllocationHelper.allocate(binVector, 4, 5);
> -    vectorList.add(intVector);
> -    vectorList.add(binVector);
> -
> -    intVector.getMutator().setSafe(0, 0); binVector.getMutator().setSafe(0, 
> "ZERO".getBytes());
> -    intVector.getMutator().setSafe(1, 1); binVector.getMutator().setSafe(1, 
> "ONE".getBytes());
> -    intVector.getMutator().setSafe(2, 2); binVector.getMutator().setSafe(2, 
> "TWO".getBytes());
> -    intVector.getMutator().setSafe(3, 3); binVector.getMutator().setSafe(3, 
> "THREE".getBytes());
> -    intVector.getMutator().setValueCount(4);
> -    binVector.getMutator().setValueCount(4);
> -
> -    VectorContainer container = new VectorContainer();
> -    container.addCollection(vectorList);
> -    container.setRecordCount(4);
> -    WritableBatch batch = 
> WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
> -    VectorAccessibleSerializable wrap = new 
> VectorAccessibleSerializable(batch, context.getAllocator());
> -
> -    DistributedMultiMap<VectorAccessibleSerializable> mmap = 
> cache.getMultiMap(VectorAccessibleSerializable.class);
> -    mmap.put("vectors", wrap);
> -    VectorAccessibleSerializable newWrap = 
> (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
> -
> -    VectorAccessible newContainer = newWrap.get();
> -    for (VectorWrapper w : newContainer) {
> -      ValueVector vv = w.getValueVector();
> -      int values = vv.getAccessor().getValueCount();
> -      for (int i = 0; i < values; i++) {
> -        Object o = vv.getAccessor().getObject(i);
> -        if (o instanceof byte[]) {
> -          System.out.println(new String((byte[])o));
> -        } else {
> -          System.out.println(o);
> +    try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache 
> cache = dcache) {
> +      bit.run();
> +      cache.run();
> +
> +      DrillbitContext context = bit.getContext();
> +
> +
> +      MaterializedField intField = MaterializedField.create(new 
> SchemaPath("int", ExpressionPosition.UNKNOWN),
> +          Types.required(TypeProtos.MinorType.INT));
> +      IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, 
> context.getAllocator());
> +      MaterializedField binField = MaterializedField.create(new 
> SchemaPath("binary", ExpressionPosition.UNKNOWN),
> +          Types.required(TypeProtos.MinorType.VARBINARY));
> +      VarBinaryVector binVector = (VarBinaryVector) 
> TypeHelper.getNewVector(binField, context.getAllocator());
> +      AllocationHelper.allocate(intVector, 4, 4);
> +      AllocationHelper.allocate(binVector, 4, 5);
> +      vectorList.add(intVector);
> +      vectorList.add(binVector);
> +
> +      intVector.getMutator().set(0, 0);
> +      binVector.getMutator().set(0, "ZERO".getBytes());
> +      intVector.getMutator().set(1, 1);
> +      binVector.getMutator().set(1, "ONE".getBytes());
> +      intVector.getMutator().set(2, 2);
> +      binVector.getMutator().set(2, "TWO".getBytes());
> +      intVector.getMutator().set(3, 3);
> +      binVector.getMutator().set(3, "THREE".getBytes());
> +      intVector.getMutator().setValueCount(4);
> +      binVector.getMutator().setValueCount(4);
> +
> +      VectorContainer container = new VectorContainer();
> +      container.addCollection(vectorList);
> +      container.setRecordCount(4);
> +      WritableBatch batch = 
> WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
> +      CachedVectorContainer wrap = new CachedVectorContainer(batch, 
> context.getAllocator());
> +
> +      DistributedMultiMap<CachedVectorContainer> mmap = 
> cache.getMultiMap(CachedVectorContainer.class);
> +      mmap.put("vectors", wrap);
> +
> +      CachedVectorContainer newWrap = (CachedVectorContainer) 
> mmap.get("vectors").iterator().next();
> +
> +      VectorAccessible newContainer = newWrap.get();
> +      for (VectorWrapper<?> w : newContainer) {
> +        ValueVector vv = w.getValueVector();
> +        int values = vv.getAccessor().getValueCount();
> +        for (int i = 0; i < values; i++) {
> +          Object o = vv.getAccessor().getObject(i);
> +          if (o instanceof byte[]) {
> +            System.out.println(new String((byte[]) o));
> +          } else {
> +            System.out.println(o);
> +          }
>         }
>       }
> +
> +      newWrap.clear();
>     }
> +
> +  }
> +
> +  @Test
> +  public void testHazelVectorCache() throws Exception {
> +    DrillConfig c = DrillConfig.create();
> +    HazelCache cache = new HazelCache(c, new TopLevelAllocator());
> +    cache.run();
> +    testCache(c, cache);
> +    cache.close();
> +  }
> +
> +  @Test
> +  public void testICache() throws Exception {
> +    DrillConfig c = DrillConfig.create();
> +    ICache cache = new ICache(c, new TopLevelAllocator());
> +    testCache(c, cache);
> +
>   }
> }
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
> index 63bc0a9..a5dbfe5 100644
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
> @@ -23,7 +23,7 @@ import net.hydromatic.optiq.tools.Frameworks;
> 
> import org.apache.drill.common.config.DrillConfig;
> import org.apache.drill.exec.ExecTest;
> -import org.apache.drill.exec.cache.LocalCache;
> +import org.apache.drill.exec.cache.local.LocalCache;
> import org.apache.drill.exec.memory.TopLevelAllocator;
> import org.apache.drill.exec.rpc.user.UserSession;
> import org.apache.drill.exec.server.DrillbitContext;
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
> index acb5929..3ccb96b 100644
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
> @@ -18,12 +18,13 @@
> package org.apache.drill.exec.store.ischema;
> 
> 
> -import static org.mockito.Mockito.*;
> +import static org.mockito.Mockito.mock;
> +import static org.mockito.Mockito.when;
> import net.hydromatic.optiq.SchemaPlus;
> import net.hydromatic.optiq.tools.Frameworks;
> 
> import org.apache.drill.common.config.DrillConfig;
> -import org.apache.drill.exec.cache.LocalCache;
> +import org.apache.drill.exec.cache.local.LocalCache;
> import org.apache.drill.exec.memory.TopLevelAllocator;
> import org.apache.drill.exec.rpc.user.UserSession;
> import org.apache.drill.exec.server.DrillbitContext;
> @@ -40,7 +41,7 @@ public class OrphanSchema {
>    * @return root node of the created schema.
>    */
>   public static SchemaPlus create() throws Exception {
> -    
> +
>     final DrillConfig c = DrillConfig.create();
> 
>     // Mock up a context which will allow us to create a schema.
> @@ -51,7 +52,7 @@ public class OrphanSchema {
>     when(bitContext.getCache()).thenReturn(new LocalCache());
> 
>     bitContext.getCache().run();
> -    
> +
>     // Using the mock context, get the orphan schema.
>     StoragePluginRegistry r = new StoragePluginRegistry(bitContext);
>     r.init();
> 

Reply via email to