Serialization/Deserialization fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/710eeeec Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/710eeeec Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/710eeeec Branch: refs/heads/master Commit: 710eeeec24c7f820fb9298d259d8c1a09253e11d Parents: 24e3af2 Author: Jacques Nadeau <[email protected]> Authored: Wed May 21 18:14:26 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Fri May 23 10:02:53 2014 -0700 ---------------------------------------------------------------------- .../cache/LoopedAbstractDrillSerializable.java | 2 +- .../drill/exec/cache/ProtoSerializable.java | 29 ++- .../drill/exec/cache/infinispan/ICache.java | 3 +- .../infinispan/JacksonAdvancedExternalizer.java | 8 +- .../ProtobufAdvancedExternalizer.java | 2 +- .../apache/drill/exec/client/DrillClient.java | 13 +- .../drill/exec/server/RemoteServiceSet.java | 8 +- .../java/org/apache/drill/BaseTestQuery.java | 14 +- .../exec/cache/TestCacheSerialization.java | 193 +++++++++++++++++++ .../drill/exec/cache/TestVectorCache.java | 129 ------------- 10 files changed, 260 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java index 1de030d..d2a7458 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java @@ -45,7 +45,7 @@ abstract class LoopedAbstractDrillSerializable implements DrillSerializable { ByteArrayOutputStream baos = new ByteArrayOutputStream(); writeToStream(baos); byte[] ba = baos.toByteArray(); - out.write(ba.length); + out.writeInt(ba.length); out.write(ba); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java index 1538a85..f48aae1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java @@ -45,12 +45,37 @@ public abstract class ProtoSerializable<V extends Message> extends AbstractStrea @Override public void readFromStream(InputStream input) throws IOException { - obj = protoParser.parseFrom(input); + obj = protoParser.parseDelimitedFrom(input); } @Override public void writeToStream(OutputStream output) throws IOException { - obj.writeTo(output); + obj.writeDelimitedTo(output); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((obj == null) ? 0 : obj.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ProtoSerializable other = (ProtoSerializable) obj; + if (this.obj == null) { + if (other.obj != null) + return false; + } else if (!this.obj.equals(other.obj)) + return false; + return true; } public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java index 92ce08d..f56f19a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java @@ -81,8 +81,9 @@ public class ICache implements DistributedCache{ Configuration c = new ConfigurationBuilder() // .clustering() // + .cacheMode(CacheMode.DIST_ASYNC) // - .storeAsBinary() // + .storeAsBinary().enable() // .build(); this.manager = new DefaultCacheManager(gc, c); JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java index 81f4877..63a6d62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java @@ -46,12 +46,16 @@ public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T> @Override public T readObject(ObjectInput in) throws IOException, ClassNotFoundException { - return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz); + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return (T) mapper.readValue(bytes, clazz); } @Override public void writeObject(ObjectOutput out, T object) throws IOException { - out.write(mapper.writeValueAsBytes(object)); + byte[] bytes = mapper.writeValueAsBytes(object); + out.writeInt(bytes.length); + out.write(bytes); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java index 821443a..df97a01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java @@ -46,7 +46,7 @@ public class ProtobufAdvancedExternalizer<T extends Message> implements Advanced @Override public T readObject(ObjectInput in) throws IOException, ClassNotFoundException { - return parser.parseDelimitedFrom(DataInputInputStream.constructInputStream(in)); + return parser.parseFrom(DataInputInputStream.constructInputStream(in)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 92097e7..4755d32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -34,6 +34,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -68,10 +69,11 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private UserProperties props = null; private volatile ClusterCoordinator clusterCoordinator; private volatile boolean connected = false; - private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE); + private final BufferAllocator allocator; private int reconnectTimes; private int reconnectDelay; private final boolean ownsZkConnection; + private final boolean ownsAllocator; public DrillClient() { this(DrillConfig.create()); @@ -86,7 +88,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } public DrillClient(DrillConfig config, ClusterCoordinator coordinator){ + this(config, coordinator, null); + } + + public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator){ this.ownsZkConnection = coordinator == null; + this.ownsAllocator = allocator == null; + this.allocator = allocator == null ? new TopLevelAllocator(Long.MAX_VALUE) : allocator; this.config = config; this.clusterCoordinator = coordinator; this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); @@ -180,7 +188,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } } - public TopLevelAllocator getAllocator() { + public BufferAllocator getAllocator() { return allocator; } @@ -189,6 +197,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ */ public void close(){ if(this.client != null) this.client.close(); + if(this.ownsAllocator && allocator != null) allocator.close(); if(ownsZkConnection){ try { this.clusterCoordinator.close(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java index 2078107..d64efa4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java @@ -20,11 +20,13 @@ package org.apache.drill.exec.server; import java.io.Closeable; import java.io.IOException; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.cache.DistributedCache; +import org.apache.drill.exec.cache.infinispan.ICache; import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.LocalClusterCoordinator; -import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.memory.BufferAllocator; public class RemoteServiceSet implements Closeable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); @@ -63,4 +65,8 @@ public class RemoteServiceSet implements Closeable{ return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator()); } + public static RemoteServiceSet getServiceSetWithFullCache(DrillConfig config, BufferAllocator allocator) throws Exception{ + ICache c = new ICache(config, allocator); + return new RemoteServiceSet(c, new LocalClusterCoordinator()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index fcd2a3b..e7bc87d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.client.PrintingResultsListener; import org.apache.drill.exec.client.QuerySubmitter; import org.apache.drill.exec.client.QuerySubmitter.Format; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.rpc.RpcException; @@ -52,6 +53,8 @@ import com.google.common.io.Resources; public class BaseTestQuery extends ExecTest{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); + private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache"; + public final TestRule resetWatcher = new TestWatcher() { @Override protected void failed(Throwable e, Description description) { @@ -68,6 +71,7 @@ public class BaseTestQuery extends ExecTest{ protected static RemoteServiceSet serviceSet; protected static DrillConfig config; protected static QuerySubmitter submitter = new QuerySubmitter(); + protected static BufferAllocator allocator; static void resetClientAndBit() throws Exception{ closeClient(); @@ -77,7 +81,12 @@ public class BaseTestQuery extends ExecTest{ @BeforeClass public static void openClient() throws Exception{ config = DrillConfig.create(); - serviceSet = RemoteServiceSet.getLocalServiceSet(); + allocator = new TopLevelAllocator(config); + if(config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)){ + serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator); + }else{ + serviceSet = RemoteServiceSet.getLocalServiceSet(); + } bit = new Drillbit(config, serviceSet); bit.run(); client = new DrillClient(config, serviceSet.getCoordinator()); @@ -85,7 +94,7 @@ public class BaseTestQuery extends ExecTest{ } protected BufferAllocator getAllocator(){ - return client.getAllocator(); + return allocator; } @AfterClass @@ -93,6 +102,7 @@ public class BaseTestQuery extends ExecTest{ if(client != null) client.close(); if(bit != null) bit.close(); if(serviceSet != null) serviceSet.close(); + if(allocator != null) allocator.close(); } protected void runSQL(String sql) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java new file mode 100644 index 0000000..6375d66 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecTest; +import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable; +import org.apache.drill.exec.cache.infinispan.ICache; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.planner.logical.StoragePlugins; +import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.server.options.OptionValue.OptionType; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.hive12.common.collect.Maps; + +public class TestCacheSerialization extends ExecTest { + + private static DistributedCache ICACHE; + private static BufferAllocator ALLOCATOR; + private static final DrillConfig CONFIG = DrillConfig.create(); + + @Test + public void testProtobufSerialization() { + DistributedMap<FragmentHandleSerializable> map = ICACHE.getMap(FragmentHandleSerializable.class); + FragmentHandle h = FragmentHandle.newBuilder().setMajorFragmentId(1).setMinorFragmentId(1).setQueryId(QueryId.newBuilder().setPart1(74).setPart2(66).build()).build(); + FragmentHandleSerializable s = new FragmentHandleSerializable(h); + map.put("1", s); + for(int i =0; i < 2; i++){ + FragmentHandleSerializable s2 = map.get("1"); + Assert.assertEquals(s.getObject(), s2.getObject()); + } + } + +// @Test +// public void testProtobufExternalizer(){ +// final FragmentStatus fs = FragmentStatus.newBuilder().setHandle(FragmentHandle.newBuilder().setMajorFragmentId(1).setMajorFragmentId(35)).build(); +// DistributedMap<OptionValue> map = ICACHE.getNamedMap(FragmentStatus.class); +// map.put("1", v); +// for(int i = 0; i < 5; i++){ +// OptionValue v2 = map.get("1"); +// Assert.assertEquals(v, v2); +// } +// } + + @Test + public void testJackSerializable(){ + OptionValue v = OptionValue.createBoolean(OptionType.SESSION, "my test option", true); + DistributedMap<OptionValue> map = ICACHE.getNamedMap("sys.options", OptionValue.class); + map.put("1", v); + for(int i = 0; i < 5; i++){ + OptionValue v2 = map.get("1"); + Assert.assertEquals(v, v2); + } + } + + @Test + public void testCustomJsonSerialization(){ + Map<String, StoragePluginConfig> configs = Maps.newHashMap(); + configs.put("hello", new FileSystemConfig()); + StoragePlugins p = new StoragePlugins(configs); + + DistributedMap<StoragePlugins> map = ICACHE.getMap(StoragePlugins.class); + map.put("1", p); + for(int i =0; i < 2; i++){ + StoragePlugins p2 = map.get("1"); + Assert.assertEquals(p, p2); + } + } + + @Test + public void testVectorCache() throws Exception { + List<ValueVector> vectorList = Lists.newArrayList(); + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), + Types.required(TypeProtos.MinorType.INT)); + IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, ALLOCATOR); + MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), + Types.required(TypeProtos.MinorType.VARBINARY)); + VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, ALLOCATOR); + AllocationHelper.allocate(intVector, 4, 4); + AllocationHelper.allocate(binVector, 4, 5); + vectorList.add(intVector); + vectorList.add(binVector); + + intVector.getMutator().setSafe(0, 0); + binVector.getMutator().setSafe(0, "ZERO".getBytes()); + intVector.getMutator().setSafe(1, 1); + binVector.getMutator().setSafe(1, "ONE".getBytes()); + intVector.getMutator().setSafe(2, 2); + binVector.getMutator().setSafe(2, "TWO".getBytes()); + intVector.getMutator().setSafe(3, 3); + binVector.getMutator().setSafe(3, "THREE".getBytes()); + intVector.getMutator().setValueCount(4); + binVector.getMutator().setValueCount(4); + + VectorContainer container = new VectorContainer(); + container.addCollection(vectorList); + container.setRecordCount(4); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); + CachedVectorContainer wrap = new CachedVectorContainer(batch, ALLOCATOR); + + DistributedMultiMap<CachedVectorContainer> mmap = ICACHE.getMultiMap(CachedVectorContainer.class); + mmap.put("vectors", wrap); + + for(int x =0; x < 2; x++){ + CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next(); + + VectorAccessible newContainer = newWrap.get(); + for (VectorWrapper<?> w : newContainer) { + ValueVector vv = w.getValueVector(); + int values = vv.getAccessor().getValueCount(); + for (int i = 0; i < values; i++) { + Object o = vv.getAccessor().getObject(i); + if (o instanceof byte[]) { + System.out.println(new String((byte[]) o)); + } else { + System.out.println(o); + } + } + } + + newWrap.clear(); + } + } + + // @Test + // public void testHazelVectorCache() throws Exception { + // DrillConfig c = DrillConfig.create(); + // HazelCache cache = new HazelCache(c, new TopLevelAllocator()); + // cache.run(); + // testCache(c, cache); + // cache.close(); + // } + + @BeforeClass + public static void setupCache() throws Exception { + ALLOCATOR = new TopLevelAllocator(); + ICACHE = new ICache(CONFIG, ALLOCATOR); + ICACHE.run(); + } + + @AfterClass + public static void destroyCache() throws Exception { + ICACHE.close(); + ALLOCATOR.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java deleted file mode 100644 index 3e0be69..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache; - -import java.util.List; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.ExecTest; -import org.apache.drill.exec.cache.hazel.HazelCache; -import org.apache.drill.exec.cache.infinispan.ICache; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.TopLevelAllocator; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.RemoteServiceSet; -import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarBinaryVector; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class TestVectorCache extends ExecTest{ - - private void testCache(DrillConfig config, DistributedCache dcache) throws Exception { - List<ValueVector> vectorList = Lists.newArrayList(); - RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - - try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache cache = dcache) { - bit.run(); - cache.run(); - - DrillbitContext context = bit.getContext(); - - - MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), - Types.required(TypeProtos.MinorType.INT)); - IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator()); - MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), - Types.required(TypeProtos.MinorType.VARBINARY)); - VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator()); - AllocationHelper.allocate(intVector, 4, 4); - AllocationHelper.allocate(binVector, 4, 5); - vectorList.add(intVector); - vectorList.add(binVector); - - intVector.getMutator().setSafe(0, 0); - binVector.getMutator().setSafe(0, "ZERO".getBytes()); - intVector.getMutator().setSafe(1, 1); - binVector.getMutator().setSafe(1, "ONE".getBytes()); - intVector.getMutator().setSafe(2, 2); - binVector.getMutator().setSafe(2, "TWO".getBytes()); - intVector.getMutator().setSafe(3, 3); - binVector.getMutator().setSafe(3, "THREE".getBytes()); - intVector.getMutator().setValueCount(4); - binVector.getMutator().setValueCount(4); - - VectorContainer container = new VectorContainer(); - container.addCollection(vectorList); - container.setRecordCount(4); - WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false); - CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getAllocator()); - - DistributedMultiMap<CachedVectorContainer> mmap = cache.getMultiMap(CachedVectorContainer.class); - mmap.put("vectors", wrap); - - CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next(); - - VectorAccessible newContainer = newWrap.get(); - for (VectorWrapper<?> w : newContainer) { - ValueVector vv = w.getValueVector(); - int values = vv.getAccessor().getValueCount(); - for (int i = 0; i < values; i++) { - Object o = vv.getAccessor().getObject(i); - if (o instanceof byte[]) { - System.out.println(new String((byte[]) o)); - } else { - System.out.println(o); - } - } - } - - newWrap.clear(); - } - - } - -// @Test -// public void testHazelVectorCache() throws Exception { -// DrillConfig c = DrillConfig.create(); -// HazelCache cache = new HazelCache(c, new TopLevelAllocator()); -// cache.run(); -// testCache(c, cache); -// cache.close(); -// } - - @Test - public void testICache() throws Exception { - DrillConfig c = DrillConfig.create(); - ICache cache = new ICache(c, new TopLevelAllocator()); - testCache(c, cache); - - } -}
