Looks like we added ZK Pstore instead of HBase? Tim
On Tue, Jun 10, 2014 at 8:52 PM, <[email protected]> wrote: > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java > new file mode 100644 > index 0000000..eb21c70 > --- /dev/null > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java > @@ -0,0 +1,180 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.store.sys.zk; > + > +import java.io.IOException; > +import java.util.Iterator; > +import java.util.List; > +import java.util.Map.Entry; > + > +import org.apache.curator.framework.CuratorFramework; > +import org.apache.drill.exec.store.sys.PStore; > +import org.apache.drill.exec.store.sys.PStoreConfig; > +import org.apache.zookeeper.CreateMode; > + > +import com.google.common.base.Preconditions; > + > +public class ZkPStore<V> implements PStore<V>{ > + > + private CuratorFramework framework; > + private PStoreConfig<V> config; > + private String prefix; > + private String parent; > + > + ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws > IOException { > + this.parent = "/" + config.getName(); > + this.prefix = parent + "/"; > + this.framework = framework; > + this.config = config; > + > + // make sure the parent node exists. > + try{ > + if(framework.checkExists().forPath(parent) == null) { > + framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); > + } > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + > + } > + > + @Override > + public Iterator<Entry<String, V>> iterator() { > + try{ > + List<String> children = framework.getChildren().forPath(parent); > + return new Iter(children.iterator()); > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + > + } > + > + private String p(String key){ > + Preconditions.checkArgument(!key.contains("/"), "You cannot use keys > that have slashes in them when using the Zookeeper SystemTable storage > interface."); > + return prefix + key; > + } > + > + @Override > + public V get(String key) { > + try{ > + byte[] bytes = framework.getData().forPath(p(key)); > + if(bytes == null){ > + return null; > + } > + return config.getSerializer().deserialize(bytes); > + > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + } > + > + @Override > + public void put(String key, V value) { > + try{ > + if(framework.checkExists().forPath(p(key)) != null) { > + framework.setData().forPath(p(key), > config.getSerializer().serialize(value)); > + }else{ > + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), > config.getSerializer().serialize(value)); > + } > + > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + > + } > + > + @Override > + public boolean putIfAbsent(String key, V value) { > + try{ > + if(framework.checkExists().forPath(p(key)) != null) { > + return false; > + }else{ > + framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), > config.getSerializer().serialize(value)); > + return true; > + } > + > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + } > + > + @Override > + public void delete(String key) { > + try{ > + framework.delete().forPath(p(key)); > + }catch(Exception e){ > + throw new RuntimeException("Failure while accessing Zookeeper", e); > + } > + } > + > + private class Iter implements Iterator<Entry<String, V>>{ > + > + private Iterator<String> keys; > + private String current; > + > + public Iter(Iterator<String> keys) { > + super(); > + this.keys = keys; > + } > + > + @Override > + public boolean hasNext() { > + return keys.hasNext(); > + } > + > + @Override > + public Entry<String, V> next() { > + current = keys.next(); > + return new DeferredEntry(current); > + } > + > + @Override > + public void remove() { > + delete(current); > + keys.remove(); > + } > + > + private class DeferredEntry implements Entry<String, V>{ > + > + private String name; > + > + public DeferredEntry(String name) { > + super(); > + this.name = name; > + } > + > + @Override > + public String getKey() { > + return name; > + } > + > + @Override > + public V getValue() { > + return get(name); > + } > + > + @Override > + public V setValue(V value) { > + throw new UnsupportedOperationException(); > + } > + > + } > + > + } > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java > new file mode 100644 > index 0000000..f4513c2 > --- /dev/null > +++ > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java > @@ -0,0 +1,61 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.store.sys.zk; > + > +import java.io.IOException; > + > +import org.apache.curator.framework.CuratorFramework; > +import org.apache.drill.exec.coord.ClusterCoordinator; > +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; > +import org.apache.drill.exec.exception.DrillbitStartupException; > +import org.apache.drill.exec.store.sys.PStore; > +import org.apache.drill.exec.store.sys.PStoreConfig; > +import org.apache.drill.exec.store.sys.PStoreProvider; > +import org.apache.drill.exec.store.sys.PStoreRegistry; > + > +public class ZkPStoreProvider implements PStoreProvider{ > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class); > + > + private final CuratorFramework curator; > + > + public ZkPStoreProvider(PStoreRegistry registry) throws > DrillbitStartupException { > + ClusterCoordinator coord = registry.getClusterCoordinator(); > + if (!(coord instanceof ZKClusterCoordinator)) { > + throw new DrillbitStartupException("A ZkPStoreProvider was created > without a ZKClusterCoordinator."); > + } > + this.curator = > ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator(); > + } > + > + public ZkPStoreProvider(CuratorFramework curator) { > + this.curator = curator; > + } > + > + @Override > + public void close() { > + } > + > + @Override > + public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { > + return new ZkPStore<V>(curator, store); > + } > + > + @Override > + public void start() { > + } > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java > deleted file mode 100644 > index e2a6ecf..0000000 > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java > +++ /dev/null > @@ -1,182 +0,0 @@ > -/** > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > -package org.apache.drill.exec.store.sys.zk; > - > -import java.io.IOException; > -import java.util.Iterator; > -import java.util.List; > -import java.util.Map.Entry; > - > -import org.apache.curator.framework.CuratorFramework; > -import org.apache.drill.exec.store.sys.PTable; > -import org.apache.drill.exec.store.sys.PTableConfig; > -import org.apache.zookeeper.CreateMode; > - > -import com.google.common.base.Preconditions; > - > -public class ZkPTable<V> implements PTable<V>{ > - > - private CuratorFramework framework; > - private PTableConfig<V> config; > - private String prefix; > - private String parent; > - > - ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws > IOException { > - super(); > - this.parent = "/" + config.getName(); > - this.prefix = parent + "/"; > - this.framework = framework; > - this.config = config; > - > - // make sure the parent node exists. > - try{ > - if(framework.checkExists().forPath(parent) == null) { > - framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); > - } > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - > - } > - > - @Override > - public Iterator<Entry<String, V>> iterator() { > - try{ > - List<String> children = framework.getChildren().forPath(parent); > - return new Iter(children.iterator()); > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - > - } > - > - private String p(String key){ > - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys > that have slashes in them when using the Zookeeper SystemTable storage > interface."); > - return prefix + key; > - } > - > - @Override > - public V get(String key) { > - try{ > - byte[] bytes = framework.getData().forPath(p(key)); > - if(bytes == null){ > - return null; > - } > - return config.getSerializer().deserialize(bytes); > - > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - } > - > - @Override > - public void put(String key, V value) { > - try{ > - if(framework.checkExists().forPath(p(key)) != null) { > - framework.setData().forPath(p(key), > config.getSerializer().serialize(value)); > - }else{ > - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), > config.getSerializer().serialize(value)); > - } > - > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - > - } > - > - @Override > - public boolean putIfAbsent(String key, V value) { > - try{ > - if(framework.checkExists().forPath(p(key)) != null) { > - return false; > - }else{ > - framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), > config.getSerializer().serialize(value)); > - return true; > - } > - > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - } > - > - @Override > - public void delete(String key) { > - try{ > - framework.delete().forPath(p(key)); > - }catch(Exception e){ > - throw new RuntimeException("Failure while accessing Zookeeper", e); > - } > - } > - > - > - private class Iter implements Iterator<Entry<String, V>>{ > - > - private Iterator<String> keys; > - private String current; > - > - public Iter(Iterator<String> keys) { > - super(); > - this.keys = keys; > - } > - > - @Override > - public boolean hasNext() { > - return keys.hasNext(); > - } > - > - @Override > - public Entry<String, V> next() { > - current = keys.next(); > - return new DeferredEntry(current); > - } > - > - @Override > - public void remove() { > - delete(current); > - keys.remove(); > - } > - > - > - private class DeferredEntry implements Entry<String, V>{ > - > - private String name; > - > - > - public DeferredEntry(String name) { > - super(); > - this.name = name; > - } > - > - @Override > - public String getKey() { > - return name; > - } > - > - @Override > - public V getValue() { > - return get(name); > - } > - > - @Override > - public V setValue(V value) { > - throw new UnsupportedOperationException(); > - } > - > - } > - } > -} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java > > b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java > deleted file mode 100644 > index 8d2e153..0000000 > --- > a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java > +++ /dev/null > @@ -1,50 +0,0 @@ > -/** > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > -package org.apache.drill.exec.store.sys.zk; > - > -import java.io.IOException; > - > -import org.apache.curator.framework.CuratorFramework; > -import org.apache.drill.exec.store.sys.PTable; > -import org.apache.drill.exec.store.sys.PTableConfig; > -import org.apache.drill.exec.store.sys.TableProvider; > - > -public class ZkTableProvider implements TableProvider{ > - static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class); > - > - private final CuratorFramework curator; > - > - public ZkTableProvider(CuratorFramework curator){ > - this.curator = curator; > - } > - > - @Override > - public void close() { > - } > - > - @Override > - public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException { > - return new ZkPTable<V>(curator, table); > - } > - > - @Override > - public void start() { > - } > - > - > -} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java > b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java > index 1eae8c5..71e4e8e 100644 > --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java > +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java > @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler; > import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl; > import org.apache.drill.exec.server.BootStrapContext; > import org.apache.drill.exec.server.DrillbitContext; > -import org.apache.drill.exec.store.sys.TableProvider; > +import org.apache.drill.exec.store.sys.PStoreProvider; > import org.apache.drill.exec.work.batch.ControlHandlerImpl; > import org.apache.drill.exec.work.batch.ControlMessageHandler; > import org.apache.drill.exec.work.foreman.Foreman; > @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{ > this.dataHandler = new DataResponseHandlerImpl(bee); > } > > - public void start(DrillbitEndpoint endpoint, DistributedCache cache, > Controller controller, DataConnectionCreator data, ClusterCoordinator coord, > TableProvider provider){ > + public void start(DrillbitEndpoint endpoint, DistributedCache cache, > Controller controller, DataConnectionCreator data, ClusterCoordinator coord, > PStoreProvider provider){ > this.dContext = new DrillbitContext(endpoint, bContext, coord, > controller, data, cache, workBus, provider); > // executor = > Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) > executor = Executors.newCachedThreadPool(new > NamedThreadFactory("WorkManager-")); > @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{ > @Override > public void close() throws IOException { > try { > - executor.awaitTermination(1, TimeUnit.SECONDS); > + if (executor != null) { > + executor.awaitTermination(1, TimeUnit.SECONDS); > + } > } catch (InterruptedException e) { > logger.warn("Executor interrupted while awaiting termination"); > } > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf > ---------------------------------------------------------------------- > diff --git a/exec/java-exec/src/main/resources/drill-module.conf > b/exec/java-exec/src/main/resources/drill-module.conf > index 6a7c9d5..9ce22c7 100644 > --- a/exec/java-exec/src/main/resources/drill-module.conf > +++ b/exec/java-exec/src/main/resources/drill-module.conf > @@ -94,7 +94,8 @@ drill.exec: { > affinity.factor: 1.2, > executor.threads: 4 > }, > - sys.tables: { > + sys.store.provider: { > + class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", > local: { > path: "/tmp/drill", > write: true > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > ---------------------------------------------------------------------- > diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > index 547af34..ad114ab 100644 > --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java > @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager; > import org.apache.drill.exec.server.options.SessionOptionManager; > import org.apache.drill.exec.server.options.SystemOptionManager; > import org.apache.drill.exec.store.StoragePluginRegistry; > -import org.apache.drill.exec.store.sys.local.LocalTableProvider; > +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; > import org.junit.Rule; > import org.junit.rules.TestRule; > > @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{ > final DistributedCache cache = new LocalCache(); > cache.run(); > > - final LocalTableProvider provider = new LocalTableProvider(config); > + final LocalPStoreProvider provider = new LocalPStoreProvider(config); > provider.start(); > > final SystemOptionManager opt = new SystemOptionManager(config, > provider); > @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{ > result = opt; > dbContext.getCache(); > result = cache; > - dbContext.getSystemTableProvider(); > + dbContext.getPersistentStoreProvider(); > result = provider; > } > }; > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java > index 8fc37f3..199ecfc 100644 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java > @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit; > import org.apache.drill.exec.server.DrillbitContext; > import org.apache.drill.exec.server.RemoteServiceSet; > import org.apache.drill.exec.store.StoragePluginRegistry; > -import org.apache.drill.exec.store.sys.local.LocalTableProvider; > +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; > import org.apache.drill.exec.vector.ValueVector; > import org.apache.drill.exec.vector.VarBinaryVector; > import org.junit.AfterClass; > @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest { > } > }; > RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); > - DrillbitContext bitContext = new > DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, > controller, com, cache, workBus, new > LocalTableProvider(DrillConfig.create())); > + DrillbitContext bitContext = new > DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, > controller, com, cache, workBus, new > LocalPStoreProvider(DrillConfig.create())); > QueryContext qc = new QueryContext(new UserSession(null, null, null), > QueryId.getDefaultInstance(), bitContext); > PhysicalPlanReader reader = bitContext.getPlanReader(); > LogicalPlan plan = > reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), > Charsets.UTF_8)); > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java > new file mode 100644 > index 0000000..6f7794b > --- /dev/null > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java > @@ -0,0 +1,66 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.store.sys; > + > +import static org.junit.Assert.assertEquals; > +import static org.junit.Assert.assertFalse; > +import static org.junit.Assert.assertTrue; > + > +import java.util.Iterator; > +import java.util.Map; > +import java.util.Map.Entry; > + > +import com.fasterxml.jackson.databind.ObjectMapper; > +import com.google.common.collect.Maps; > + > +public class PStoreTestUtil { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class); > + > + public static void test(PStoreProvider provider) throws Exception{ > + PStore<String> store = > provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), > String.class).name("sys.test").build()); > + String[] keys = {"first", "second"}; > + String[] values = {"value1", "value2"}; > + Map<String, String> expectedMap = Maps.newHashMap(); > + > + for(int i =0; i < keys.length; i++){ > + expectedMap.put(keys[i], values[i]); > + store.put(keys[i], values[i]); > + } > + > + { > + Iterator<Map.Entry<String, String>> iter = store.iterator(); > + for(int i =0; i < keys.length; i++){ > + Entry<String, String> e = iter.next(); > + assertTrue(expectedMap.containsKey(e.getKey())); > + assertEquals(expectedMap.get(e.getKey()), e.getValue()); > + } > + > + assertFalse(iter.hasNext()); > + } > + > + { > + Iterator<Map.Entry<String, String>> iter = store.iterator(); > + while(iter.hasNext()){ > + iter.next(); > + iter.remove(); > + } > + } > + > + assertFalse(store.iterator().hasNext()); > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java > deleted file mode 100644 > index 47a783b..0000000 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java > +++ /dev/null > @@ -1,66 +0,0 @@ > -/** > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > -package org.apache.drill.exec.store.sys; > - > -import static org.junit.Assert.assertEquals; > -import static org.junit.Assert.assertFalse; > -import static org.junit.Assert.assertTrue; > - > -import java.util.Iterator; > -import java.util.Map; > -import java.util.Map.Entry; > - > -import com.fasterxml.jackson.databind.ObjectMapper; > -import com.google.common.collect.Maps; > - > -public class PTableTestUtil { > - static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class); > - > - public static void test(TableProvider provider) throws Exception{ > - PTable<String> table = > provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), > String.class).name("sys.test").build()); > - String[] keys = {"first", "second"}; > - String[] values = {"value1", "value2"}; > - Map<String, String> expectedMap = Maps.newHashMap(); > - > - for(int i =0; i < keys.length; i++){ > - expectedMap.put(keys[i], values[i]); > - table.put(keys[i], values[i]); > - } > - > - { > - Iterator<Map.Entry<String, String>> iter = table.iterator(); > - for(int i =0; i < keys.length; i++){ > - Entry<String, String> e = iter.next(); > - assertTrue(expectedMap.containsKey(e.getKey())); > - assertEquals(expectedMap.get(e.getKey()), e.getValue()); > - } > - > - assertFalse(iter.hasNext()); > - } > - > - { > - Iterator<Map.Entry<String, String>> iter = table.iterator(); > - while(iter.hasNext()){ > - iter.next(); > - iter.remove(); > - } > - } > - > - assertFalse(table.iterator().hasNext()); > - } > -} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java > new file mode 100644 > index 0000000..18d87c7 > --- /dev/null > +++ > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java > @@ -0,0 +1,58 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.drill.exec.store.sys; > + > +import org.apache.curator.framework.CuratorFramework; > +import org.apache.curator.framework.CuratorFrameworkFactory; > +import org.apache.curator.retry.RetryNTimes; > +import org.apache.drill.common.config.DrillConfig; > +import org.apache.drill.exec.ExecConstants; > +import org.apache.drill.exec.TestWithZookeeper; > +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; > +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider; > +import org.junit.Test; > + > +public class TestPStoreProviders extends TestWithZookeeper { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class); > + > + static LocalPStoreProvider provider; > + > + @Test > + public void verifyLocalStore() throws Exception { > + try(LocalPStoreProvider provider = new > LocalPStoreProvider(DrillConfig.create())){ > + PStoreTestUtil.test(provider); > + } > + } > + > + @Test > + public void verifyZkStore() throws Exception { > + DrillConfig config = getConfig(); > + String connect = config.getString(ExecConstants.ZK_CONNECTION); > + CuratorFrameworkFactory.Builder builder = > CuratorFrameworkFactory.builder() > + .namespace(config.getString(ExecConstants.ZK_ROOT)) > + .retryPolicy(new RetryNTimes(1, 100)) > + .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) > + .connectString(connect); > + > + try(CuratorFramework curator = builder.build()){ > + curator.start(); > + ZkPStoreProvider provider = new ZkPStoreProvider(curator); > + PStoreTestUtil.test(provider); > + } > + } > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java > ---------------------------------------------------------------------- > diff --git > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java > > b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java > deleted file mode 100644 > index b7d92fe..0000000 > --- > a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java > +++ /dev/null > @@ -1,58 +0,0 @@ > -/** > - * Licensed to the Apache Software Foundation (ASF) under one > - * or more contributor license agreements. See the NOTICE file > - * distributed with this work for additional information > - * regarding copyright ownership. The ASF licenses this file > - * to you under the Apache License, Version 2.0 (the > - * "License"); you may not use this file except in compliance > - * with the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > -package org.apache.drill.exec.store.sys; > - > -import org.apache.curator.framework.CuratorFramework; > -import org.apache.curator.framework.CuratorFrameworkFactory; > -import org.apache.curator.retry.RetryNTimes; > -import org.apache.drill.common.config.DrillConfig; > -import org.apache.drill.exec.ExecConstants; > -import org.apache.drill.exec.TestWithZookeeper; > -import org.apache.drill.exec.store.sys.local.LocalTableProvider; > -import org.apache.drill.exec.store.sys.zk.ZkTableProvider; > -import org.junit.Test; > - > -public class TestTableProviders extends TestWithZookeeper { > - static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(TestTableProviders.class); > - > - static LocalTableProvider provider; > - > - @Test > - public void verifyLocalTable() throws Exception { > - try(LocalTableProvider provider = new > LocalTableProvider(DrillConfig.create())){ > - PTableTestUtil.test(provider); > - } > - } > - > - @Test > - public void verifyZkTable() throws Exception { > - DrillConfig config = getConfig(); > - String connect = config.getString(ExecConstants.ZK_CONNECTION); > - CuratorFrameworkFactory.Builder builder = > CuratorFrameworkFactory.builder() > - .namespace(config.getString(ExecConstants.ZK_ROOT)) > - .retryPolicy(new RetryNTimes(1, 100)) > - .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) > - .connectString(connect); > - > - try(CuratorFramework curator = builder.build()){ > - curator.start(); > - ZkTableProvider provider = new ZkTableProvider(curator); > - PTableTestUtil.test(provider); > - } > - } > -} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml > ---------------------------------------------------------------------- > diff --git a/pom.xml b/pom.xml > index b2289f3..500f0fd 100644 > --- a/pom.xml > +++ b/pom.xml > @@ -262,7 +262,7 @@ > <artifactId>maven-surefire-plugin</artifactId> > <version>2.17</version> > <configuration> > - <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false > -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M > -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> > + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false > -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M > -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> > <forkCount>4</forkCount> > <reuseForks>true</reuseForks> > <additionalClasspathElements> >
