Looks like we added ZK Pstore instead of HBase?

Tim

On Tue, Jun 10, 2014 at 8:52 PM,  <[email protected]> wrote:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> new file mode 100644
> index 0000000..eb21c70
> --- /dev/null
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
> @@ -0,0 +1,180 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.List;
> +import java.util.Map.Entry;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.zookeeper.CreateMode;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class ZkPStore<V> implements PStore<V>{
> +
> +  private CuratorFramework framework;
> +  private PStoreConfig<V> config;
> +  private String prefix;
> +  private String parent;
> +
> +  ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws 
> IOException {
> +    this.parent = "/" + config.getName();
> +    this.prefix = parent + "/";
> +    this.framework = framework;
> +    this.config = config;
> +
> +    // make sure the parent node exists.
> +    try{
> +      if(framework.checkExists().forPath(parent) == null) {
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> +      }
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  @Override
> +  public Iterator<Entry<String, V>> iterator() {
> +    try{
> +      List<String> children = framework.getChildren().forPath(parent);
> +      return new Iter(children.iterator());
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  private String p(String key){
> +    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys 
> that have slashes in them when using the Zookeeper SystemTable storage 
> interface.");
> +    return prefix + key;
> +  }
> +
> +  @Override
> +  public V get(String key) {
> +    try{
> +      byte[] bytes = framework.getData().forPath(p(key));
> +      if(bytes == null){
> +        return null;
> +      }
> +      return config.getSerializer().deserialize(bytes);
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  @Override
> +  public void put(String key, V value) {
> +    try{
> +      if(framework.checkExists().forPath(p(key)) != null) {
> +        framework.setData().forPath(p(key), 
> config.getSerializer().serialize(value));
> +      }else{
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
> config.getSerializer().serialize(value));
> +      }
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +
> +  }
> +
> +  @Override
> +  public boolean putIfAbsent(String key, V value) {
> +    try{
> +      if(framework.checkExists().forPath(p(key)) != null) {
> +        return false;
> +      }else{
> +        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
> config.getSerializer().serialize(value));
> +        return true;
> +      }
> +
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  @Override
> +  public void delete(String key) {
> +    try{
> +      framework.delete().forPath(p(key));
> +    }catch(Exception e){
> +      throw new RuntimeException("Failure while accessing Zookeeper", e);
> +    }
> +  }
> +
> +  private class Iter implements Iterator<Entry<String, V>>{
> +
> +    private Iterator<String> keys;
> +    private String current;
> +
> +    public Iter(Iterator<String> keys) {
> +      super();
> +      this.keys = keys;
> +    }
> +
> +    @Override
> +    public boolean hasNext() {
> +      return keys.hasNext();
> +    }
> +
> +    @Override
> +    public Entry<String, V> next() {
> +      current = keys.next();
> +      return new DeferredEntry(current);
> +    }
> +
> +    @Override
> +    public void remove() {
> +      delete(current);
> +      keys.remove();
> +    }
> +
> +    private class DeferredEntry implements Entry<String, V>{
> +
> +      private String name;
> +
> +      public DeferredEntry(String name) {
> +        super();
> +        this.name = name;
> +      }
> +
> +      @Override
> +      public String getKey() {
> +        return name;
> +      }
> +
> +      @Override
> +      public V getValue() {
> +        return get(name);
> +      }
> +
> +      @Override
> +      public V setValue(V value) {
> +        throw new UnsupportedOperationException();
> +      }
> +
> +    }
> +
> +  }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> new file mode 100644
> index 0000000..f4513c2
> --- /dev/null
> +++ 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
> @@ -0,0 +1,61 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys.zk;
> +
> +import java.io.IOException;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.drill.exec.coord.ClusterCoordinator;
> +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
> +import org.apache.drill.exec.exception.DrillbitStartupException;
> +import org.apache.drill.exec.store.sys.PStore;
> +import org.apache.drill.exec.store.sys.PStoreConfig;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
> +import org.apache.drill.exec.store.sys.PStoreRegistry;
> +
> +public class ZkPStoreProvider implements PStoreProvider{
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
> +
> +  private final CuratorFramework curator;
> +
> +  public ZkPStoreProvider(PStoreRegistry registry) throws 
> DrillbitStartupException {
> +    ClusterCoordinator coord = registry.getClusterCoordinator();
> +    if (!(coord instanceof ZKClusterCoordinator)) {
> +      throw new DrillbitStartupException("A ZkPStoreProvider was created 
> without a ZKClusterCoordinator.");
> +    }
> +    this.curator = 
> ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
> +  }
> +
> +  public ZkPStoreProvider(CuratorFramework curator) {
> +    this.curator = curator;
> +  }
> +
> +  @Override
> +  public void close() {
> +  }
> +
> +  @Override
> +  public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
> +    return new ZkPStore<V>(curator, store);
> +  }
> +
> +  @Override
> +  public void start() {
> +  }
> +
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> deleted file mode 100644
> index e2a6ecf..0000000
> --- 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPTable.java
> +++ /dev/null
> @@ -1,182 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -import java.util.Iterator;
> -import java.util.List;
> -import java.util.Map.Entry;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.zookeeper.CreateMode;
> -
> -import com.google.common.base.Preconditions;
> -
> -public class ZkPTable<V> implements PTable<V>{
> -
> -  private CuratorFramework framework;
> -  private PTableConfig<V> config;
> -  private String prefix;
> -  private String parent;
> -
> -  ZkPTable(CuratorFramework framework, PTableConfig<V> config) throws 
> IOException {
> -    super();
> -    this.parent = "/" + config.getName();
> -    this.prefix = parent + "/";
> -    this.framework = framework;
> -    this.config = config;
> -
> -    // make sure the parent node exists.
> -    try{
> -      if(framework.checkExists().forPath(parent) == null) {
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
> -      }
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  @Override
> -  public Iterator<Entry<String, V>> iterator() {
> -    try{
> -      List<String> children = framework.getChildren().forPath(parent);
> -      return new Iter(children.iterator());
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  private String p(String key){
> -    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys 
> that have slashes in them when using the Zookeeper SystemTable storage 
> interface.");
> -    return prefix + key;
> -  }
> -
> -  @Override
> -  public V get(String key) {
> -    try{
> -      byte[] bytes = framework.getData().forPath(p(key));
> -      if(bytes == null){
> -        return null;
> -      }
> -      return config.getSerializer().deserialize(bytes);
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -  @Override
> -  public void put(String key, V value) {
> -    try{
> -      if(framework.checkExists().forPath(p(key)) != null) {
> -        framework.setData().forPath(p(key), 
> config.getSerializer().serialize(value));
> -      }else{
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
> config.getSerializer().serialize(value));
> -      }
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -
> -  }
> -
> -  @Override
> -  public boolean putIfAbsent(String key, V value) {
> -    try{
> -      if(framework.checkExists().forPath(p(key)) != null) {
> -        return false;
> -      }else{
> -        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
> config.getSerializer().serialize(value));
> -        return true;
> -      }
> -
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -  @Override
> -  public void delete(String key) {
> -    try{
> -      framework.delete().forPath(p(key));
> -    }catch(Exception e){
> -      throw new RuntimeException("Failure while accessing Zookeeper", e);
> -    }
> -  }
> -
> -
> -  private class Iter implements Iterator<Entry<String, V>>{
> -
> -    private Iterator<String> keys;
> -    private String current;
> -
> -    public Iter(Iterator<String> keys) {
> -      super();
> -      this.keys = keys;
> -    }
> -
> -    @Override
> -    public boolean hasNext() {
> -      return keys.hasNext();
> -    }
> -
> -    @Override
> -    public Entry<String, V> next() {
> -      current = keys.next();
> -      return new DeferredEntry(current);
> -    }
> -
> -    @Override
> -    public void remove() {
> -      delete(current);
> -      keys.remove();
> -    }
> -
> -
> -    private class DeferredEntry implements Entry<String, V>{
> -
> -      private String name;
> -
> -
> -      public DeferredEntry(String name) {
> -        super();
> -        this.name = name;
> -      }
> -
> -      @Override
> -      public String getKey() {
> -        return name;
> -      }
> -
> -      @Override
> -      public V getValue() {
> -        return get(name);
> -      }
> -
> -      @Override
> -      public V setValue(V value) {
> -        throw new UnsupportedOperationException();
> -      }
> -
> -    }
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
>  
> b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> deleted file mode 100644
> index 8d2e153..0000000
> --- 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkTableProvider.java
> +++ /dev/null
> @@ -1,50 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys.zk;
> -
> -import java.io.IOException;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.drill.exec.store.sys.PTable;
> -import org.apache.drill.exec.store.sys.PTableConfig;
> -import org.apache.drill.exec.store.sys.TableProvider;
> -
> -public class ZkTableProvider implements TableProvider{
> -  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(ZkTableProvider.class);
> -
> -  private final CuratorFramework curator;
> -
> -  public ZkTableProvider(CuratorFramework curator){
> -    this.curator = curator;
> -  }
> -
> -  @Override
> -  public void close() {
> -  }
> -
> -  @Override
> -  public <V> PTable<V> getPTable(PTableConfig<V> table) throws IOException {
> -    return new ZkPTable<V>(curator, table);
> -  }
> -
> -  @Override
> -  public void start() {
> -  }
> -
> -
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
> b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> index 1eae8c5..71e4e8e 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
> @@ -41,7 +41,7 @@ import org.apache.drill.exec.rpc.data.DataResponseHandler;
>  import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
>  import org.apache.drill.exec.server.BootStrapContext;
>  import org.apache.drill.exec.server.DrillbitContext;
> -import org.apache.drill.exec.store.sys.TableProvider;
> +import org.apache.drill.exec.store.sys.PStoreProvider;
>  import org.apache.drill.exec.work.batch.ControlHandlerImpl;
>  import org.apache.drill.exec.work.batch.ControlMessageHandler;
>  import org.apache.drill.exec.work.foreman.Foreman;
> @@ -87,7 +87,7 @@ public class WorkManager implements Closeable{
>      this.dataHandler = new DataResponseHandlerImpl(bee);
>    }
>
> -  public void start(DrillbitEndpoint endpoint, DistributedCache cache, 
> Controller controller, DataConnectionCreator data, ClusterCoordinator coord, 
> TableProvider provider){
> +  public void start(DrillbitEndpoint endpoint, DistributedCache cache, 
> Controller controller, DataConnectionCreator data, ClusterCoordinator coord, 
> PStoreProvider provider){
>      this.dContext = new DrillbitContext(endpoint, bContext, coord, 
> controller, data, cache, workBus, provider);
>   //   executor = 
> Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
>      executor = Executors.newCachedThreadPool(new 
> NamedThreadFactory("WorkManager-"));
> @@ -113,7 +113,9 @@ public class WorkManager implements Closeable{
>    @Override
>    public void close() throws IOException {
>      try {
> -      executor.awaitTermination(1, TimeUnit.SECONDS);
> +      if (executor != null) {
> +        executor.awaitTermination(1, TimeUnit.SECONDS);
> +      }
>      } catch (InterruptedException e) {
>        logger.warn("Executor interrupted while awaiting termination");
>      }
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/main/resources/drill-module.conf
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
> b/exec/java-exec/src/main/resources/drill-module.conf
> index 6a7c9d5..9ce22c7 100644
> --- a/exec/java-exec/src/main/resources/drill-module.conf
> +++ b/exec/java-exec/src/main/resources/drill-module.conf
> @@ -94,7 +94,8 @@ drill.exec: {
>      affinity.factor: 1.2,
>      executor.threads: 4
>    },
> -  sys.tables: {
> +  sys.store.provider: {
> +    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
>      local: {
>        path: "/tmp/drill",
>        write: true
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java 
> b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> index 547af34..ad114ab 100644
> --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
> @@ -43,7 +43,7 @@ import org.apache.drill.exec.server.options.OptionManager;
>  import org.apache.drill.exec.server.options.SessionOptionManager;
>  import org.apache.drill.exec.server.options.SystemOptionManager;
>  import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>  import org.junit.Rule;
>  import org.junit.rules.TestRule;
>
> @@ -72,7 +72,7 @@ public class PlanningBase extends ExecTest{
>      final DistributedCache cache = new LocalCache();
>      cache.run();
>
> -    final LocalTableProvider provider = new LocalTableProvider(config);
> +    final LocalPStoreProvider provider = new LocalPStoreProvider(config);
>      provider.start();
>
>      final SystemOptionManager opt = new SystemOptionManager(config, 
> provider);
> @@ -91,7 +91,7 @@ public class PlanningBase extends ExecTest{
>          result = opt;
>          dbContext.getCache();
>          result = cache;
> -        dbContext.getSystemTableProvider();
> +        dbContext.getPersistentStoreProvider();
>          result = provider;
>        }
>      };
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> index 8fc37f3..199ecfc 100644
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
> @@ -58,7 +58,7 @@ import org.apache.drill.exec.server.Drillbit;
>  import org.apache.drill.exec.server.DrillbitContext;
>  import org.apache.drill.exec.server.RemoteServiceSet;
>  import org.apache.drill.exec.store.StoragePluginRegistry;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
>  import org.apache.drill.exec.vector.ValueVector;
>  import org.apache.drill.exec.vector.VarBinaryVector;
>  import org.junit.AfterClass;
> @@ -109,7 +109,7 @@ public class TestOptiqPlans extends ExecTest {
>        }
>      };
>      RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
> -    DrillbitContext bitContext = new 
> DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
> controller, com, cache, workBus, new 
> LocalTableProvider(DrillConfig.create()));
> +    DrillbitContext bitContext = new 
> DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
> controller, com, cache, workBus, new 
> LocalPStoreProvider(DrillConfig.create()));
>      QueryContext qc = new QueryContext(new UserSession(null, null, null), 
> QueryId.getDefaultInstance(), bitContext);
>      PhysicalPlanReader reader = bitContext.getPlanReader();
>      LogicalPlan plan = 
> reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), 
> Charsets.UTF_8));
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> new file mode 100644
> index 0000000..6f7794b
> --- /dev/null
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
> @@ -0,0 +1,66 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertFalse;
> +import static org.junit.Assert.assertTrue;
> +
> +import java.util.Iterator;
> +import java.util.Map;
> +import java.util.Map.Entry;
> +
> +import com.fasterxml.jackson.databind.ObjectMapper;
> +import com.google.common.collect.Maps;
> +
> +public class PStoreTestUtil {
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
> +
> +  public static void test(PStoreProvider provider) throws Exception{
> +    PStore<String> store = 
> provider.getPStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), 
> String.class).name("sys.test").build());
> +    String[] keys = {"first", "second"};
> +    String[] values = {"value1", "value2"};
> +    Map<String, String> expectedMap = Maps.newHashMap();
> +
> +    for(int i =0; i < keys.length; i++){
> +      expectedMap.put(keys[i], values[i]);
> +      store.put(keys[i], values[i]);
> +    }
> +
> +    {
> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
> +      for(int i =0; i < keys.length; i++){
> +        Entry<String, String> e = iter.next();
> +        assertTrue(expectedMap.containsKey(e.getKey()));
> +        assertEquals(expectedMap.get(e.getKey()), e.getValue());
> +      }
> +
> +      assertFalse(iter.hasNext());
> +    }
> +
> +    {
> +      Iterator<Map.Entry<String, String>> iter = store.iterator();
> +      while(iter.hasNext()){
> +        iter.next();
> +        iter.remove();
> +      }
> +    }
> +
> +    assertFalse(store.iterator().hasNext());
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> deleted file mode 100644
> index 47a783b..0000000
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PTableTestUtil.java
> +++ /dev/null
> @@ -1,66 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import static org.junit.Assert.assertEquals;
> -import static org.junit.Assert.assertFalse;
> -import static org.junit.Assert.assertTrue;
> -
> -import java.util.Iterator;
> -import java.util.Map;
> -import java.util.Map.Entry;
> -
> -import com.fasterxml.jackson.databind.ObjectMapper;
> -import com.google.common.collect.Maps;
> -
> -public class PTableTestUtil {
> -  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(PTableTestUtil.class);
> -
> -  public static void test(TableProvider provider) throws Exception{
> -    PTable<String> table = 
> provider.getPTable(PTableConfig.newJacksonBuilder(new ObjectMapper(), 
> String.class).name("sys.test").build());
> -    String[] keys = {"first", "second"};
> -    String[] values = {"value1", "value2"};
> -    Map<String, String> expectedMap = Maps.newHashMap();
> -
> -    for(int i =0; i < keys.length; i++){
> -      expectedMap.put(keys[i], values[i]);
> -      table.put(keys[i], values[i]);
> -    }
> -
> -    {
> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
> -      for(int i =0; i < keys.length; i++){
> -        Entry<String, String> e = iter.next();
> -        assertTrue(expectedMap.containsKey(e.getKey()));
> -        assertEquals(expectedMap.get(e.getKey()), e.getValue());
> -      }
> -
> -      assertFalse(iter.hasNext());
> -    }
> -
> -    {
> -      Iterator<Map.Entry<String, String>> iter = table.iterator();
> -      while(iter.hasNext()){
> -        iter.next();
> -        iter.remove();
> -      }
> -    }
> -
> -    assertFalse(table.iterator().hasNext());
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> new file mode 100644
> index 0000000..18d87c7
> --- /dev/null
> +++ 
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.drill.exec.store.sys;
> +
> +import org.apache.curator.framework.CuratorFramework;
> +import org.apache.curator.framework.CuratorFrameworkFactory;
> +import org.apache.curator.retry.RetryNTimes;
> +import org.apache.drill.common.config.DrillConfig;
> +import org.apache.drill.exec.ExecConstants;
> +import org.apache.drill.exec.TestWithZookeeper;
> +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
> +import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
> +import org.junit.Test;
> +
> +public class TestPStoreProviders extends TestWithZookeeper {
> +  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
> +
> +  static LocalPStoreProvider provider;
> +
> +  @Test
> +  public void verifyLocalStore() throws Exception {
> +    try(LocalPStoreProvider provider = new 
> LocalPStoreProvider(DrillConfig.create())){
> +      PStoreTestUtil.test(provider);
> +    }
> +  }
> +
> +  @Test
> +  public void verifyZkStore() throws Exception {
> +    DrillConfig config = getConfig();
> +    String connect = config.getString(ExecConstants.ZK_CONNECTION);
> +    CuratorFrameworkFactory.Builder builder = 
> CuratorFrameworkFactory.builder()
> +    .namespace(config.getString(ExecConstants.ZK_ROOT))
> +    .retryPolicy(new RetryNTimes(1, 100))
> +    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> +    .connectString(connect);
> +
> +    try(CuratorFramework curator = builder.build()){
> +      curator.start();
> +      ZkPStoreProvider provider = new ZkPStoreProvider(curator);
> +      PStoreTestUtil.test(provider);
> +    }
> +  }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> ----------------------------------------------------------------------
> diff --git 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
>  
> b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> deleted file mode 100644
> index b7d92fe..0000000
> --- 
> a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestTableProviders.java
> +++ /dev/null
> @@ -1,58 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.drill.exec.store.sys;
> -
> -import org.apache.curator.framework.CuratorFramework;
> -import org.apache.curator.framework.CuratorFrameworkFactory;
> -import org.apache.curator.retry.RetryNTimes;
> -import org.apache.drill.common.config.DrillConfig;
> -import org.apache.drill.exec.ExecConstants;
> -import org.apache.drill.exec.TestWithZookeeper;
> -import org.apache.drill.exec.store.sys.local.LocalTableProvider;
> -import org.apache.drill.exec.store.sys.zk.ZkTableProvider;
> -import org.junit.Test;
> -
> -public class TestTableProviders extends TestWithZookeeper {
> -  static final org.slf4j.Logger logger = 
> org.slf4j.LoggerFactory.getLogger(TestTableProviders.class);
> -
> -  static LocalTableProvider provider;
> -
> -  @Test
> -  public void verifyLocalTable() throws Exception {
> -    try(LocalTableProvider provider = new 
> LocalTableProvider(DrillConfig.create())){
> -      PTableTestUtil.test(provider);
> -    }
> -  }
> -
> -  @Test
> -  public void verifyZkTable() throws Exception {
> -    DrillConfig config = getConfig();
> -    String connect = config.getString(ExecConstants.ZK_CONNECTION);
> -    CuratorFrameworkFactory.Builder builder = 
> CuratorFrameworkFactory.builder()
> -    .namespace(config.getString(ExecConstants.ZK_ROOT))
> -    .retryPolicy(new RetryNTimes(1, 100))
> -    .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
> -    .connectString(connect);
> -
> -    try(CuratorFramework curator = builder.build()){
> -      curator.start();
> -      ZkTableProvider provider = new ZkTableProvider(curator);
> -      PTableTestUtil.test(provider);
> -    }
> -  }
> -}
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/76d0e6f2/pom.xml
> ----------------------------------------------------------------------
> diff --git a/pom.xml b/pom.xml
> index b2289f3..500f0fd 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -262,7 +262,7 @@
>            <artifactId>maven-surefire-plugin</artifactId>
>            <version>2.17</version>
>            <configuration>
> -            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false 
> -Ddrill.exec.sys.tables.local.write=false -XX:MaxPermSize=256M 
> -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
> +            <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false 
> -Ddrill.exec.sys.store.provider.local.write=false -XX:MaxPermSize=256M 
> -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine>
>              <forkCount>4</forkCount>
>              <reuseForks>true</reuseForks>
>              <additionalClasspathElements>
>

Reply via email to