ACCUMULO-2836: Added context classpath support to BloomFilter, 
AggregatingIterator, and TableLoadBalancer


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c9c3fdd4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c9c3fdd4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c9c3fdd4

Branch: refs/heads/ACCUMULO-378
Commit: c9c3fdd4f5fc648ccc5e3a905236a82e88524251
Parents: eb6b325
Author: Dave Marion <dlmar...@hotmail.com>
Authored: Fri May 23 22:31:23 2014 -0400
Committer: Dave Marion <dlmar...@hotmail.com>
Committed: Fri May 23 22:31:23 2014 -0400

----------------------------------------------------------------------
 .../accumulo/core/file/BloomFilterLayer.java     | 19 ++++++++++++++++---
 .../core/iterators/AggregatingIterator.java      |  6 +++++-
 .../aggregation/conf/AggregatorSet.java          |  3 ++-
 .../iterators/conf/ColumnToClassMapping.java     | 16 +++++++++++++---
 .../master/balancer/TableLoadBalancer.java       |  9 ++++++++-
 .../java/org/apache/accumulo/master/Master.java  | 14 ++++++++++++++
 6 files changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java 
b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 5829ce6..d0e736c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -114,7 +114,14 @@ public class BloomFilterLayer {
        * load KeyFunctor
        */
       try {
-        Class<? extends KeyFunctor> clazz = 
AccumuloVFSClassLoader.loadClass(acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR), 
KeyFunctor.class);
+        String context = acuconf.get(Property.TABLE_CLASSPATH);
+        String classname = acuconf.get(Property.TABLE_BLOOM_KEY_FUNCTOR);
+        Class<? extends KeyFunctor> clazz;
+        if (context != null && !context.equals(""))
+          clazz = 
AccumuloVFSClassLoader.getContextManager().loadClass(context, classname, 
KeyFunctor.class);
+        else
+          clazz = AccumuloVFSClassLoader.loadClass(classname, 
KeyFunctor.class);
+
         transformer = clazz.newInstance();
         
       } catch (Exception e) {
@@ -186,6 +193,8 @@ public class BloomFilterLayer {
       
       loadThreshold = acuconf.getCount(Property.TABLE_BLOOM_LOAD_THRESHOLD);
       
+      final String context = acuconf.get(Property.TABLE_CLASSPATH);
+
       loadTask = new Runnable() {
         @Override
         public void run() {
@@ -208,8 +217,12 @@ public class BloomFilterLayer {
              * Load classname for keyFunctor
              */
             ClassName = in.readUTF();
-            
-            Class<? extends KeyFunctor> clazz = 
AccumuloVFSClassLoader.loadClass(ClassName, KeyFunctor.class);
+
+            Class<? extends KeyFunctor> clazz;
+            if (context != null && !context.equals(""))
+              clazz = 
AccumuloVFSClassLoader.getContextManager().loadClass(context, ClassName, 
KeyFunctor.class);
+            else
+              clazz = AccumuloVFSClassLoader.loadClass(ClassName, 
KeyFunctor.class);
             transformer = clazz.newInstance();
             
             /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
index c5c034e..9b89b47 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -167,7 +168,10 @@ public class AggregatingIterator implements 
SortedKeyValueIterator<Key,Value>, O
     this.iterator = source;
     
     try {
-      this.aggregators = new ColumnToClassMapping<Aggregator>(options, 
Aggregator.class);
+      String context = null;
+      if (null != env)
+        context = env.getConfig().get(Property.TABLE_CLASSPATH);
+      this.aggregators = new ColumnToClassMapping<Aggregator>(options, 
Aggregator.class, context);
     } catch (ClassNotFoundException e) {
       log.error(e.toString());
       throw new IllegalArgumentException(e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java
index afa7587..ad33fa2 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/aggregation/conf/AggregatorSet.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.iterators.aggregation.conf;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.accumulo.core.data.Key;
@@ -27,7 +28,7 @@ import 
org.apache.accumulo.core.iterators.conf.ColumnToClassMapping;
  */
 @Deprecated
 public class AggregatorSet extends ColumnToClassMapping<Aggregator> {
-  public AggregatorSet(Map<String,String> opts) throws InstantiationException, 
IllegalAccessException, ClassNotFoundException {
+  public AggregatorSet(Map<String,String> opts) throws InstantiationException, 
IllegalAccessException, ClassNotFoundException, IOException {
     super(opts, Aggregator.class);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java
index c835b9d..97f242b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/conf/ColumnToClassMapping.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.iterators.conf;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -41,8 +42,13 @@ public class ColumnToClassMapping<K> {
   }
   
   public ColumnToClassMapping(Map<String,String> objectStrings, Class<? 
extends K> c) throws InstantiationException, IllegalAccessException,
-      ClassNotFoundException {
-    this();
+      ClassNotFoundException, IOException {
+         this(objectStrings, c, null);
+  }
+
+  public ColumnToClassMapping(Map<String,String> objectStrings, Class<? 
extends K> c, String context) throws InstantiationException, 
IllegalAccessException,
+  ClassNotFoundException, IOException {
+         this();
     
     for (Entry<String,String> entry : objectStrings.entrySet()) {
       String column = entry.getKey();
@@ -50,7 +56,11 @@ public class ColumnToClassMapping<K> {
       
       Pair<Text,Text> pcic = ColumnSet.decodeColumns(column);
       
-      Class<? extends K> clazz = AccumuloVFSClassLoader.loadClass(className, 
c);
+      Class<? extends K> clazz;
+      if (context != null && !context.equals(""))
+        clazz = (Class<? extends K>) 
AccumuloVFSClassLoader.getContextManager().getClassLoader(context).loadClass(className);
+      else
+        clazz = AccumuloVFSClassLoader.loadClass(className, c);
       
       if (pcic.getSecond() == null) {
         addObject(pcic.getFirst(), clazz.newInstance());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
index f2478b1..d96f9b0 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java
@@ -46,7 +46,14 @@ public class TableLoadBalancer extends TabletBalancer {
   Map<String,TabletBalancer> perTableBalancers = new 
HashMap<String,TabletBalancer>();
 
   private TabletBalancer constructNewBalancerForTable(String clazzName, String 
table) throws Exception {
-    Class<? extends TabletBalancer> clazz = 
AccumuloVFSClassLoader.loadClass(clazzName, TabletBalancer.class);
+    String context = null;
+    if (null != configuration)
+      context = 
configuration.getTableConfiguration(table).get(Property.TABLE_CLASSPATH);
+    Class<? extends TabletBalancer> clazz;
+    if (context != null && !context.equals(""))
+      clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, 
clazzName, TabletBalancer.class);
+    else
+      clazz = AccumuloVFSClassLoader.loadClass(clazzName, 
TabletBalancer.class);
     Constructor<? extends TabletBalancer> constructor = 
clazz.getConstructor(String.class);
     return constructor.newInstance(table);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c9c3fdd4/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 2440ee4..797e066 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -115,6 +115,8 @@ import 
org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.start.classloader.vfs.ContextManager;
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -477,6 +479,18 @@ public class Master implements LiveTServerSet.Listener, 
TableObserver, CurrentSt
     tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
     this.tabletBalancer = 
aconf.instantiateClassProperty(Property.MASTER_TABLET_BALANCER, 
TabletBalancer.class, new DefaultLoadBalancer());
     this.tabletBalancer.init(serverConfig);
+
+    try {
+      AccumuloVFSClassLoader.getContextManager().setContextConfig(new 
ContextManager.DefaultContextsConfig(new Iterable<Entry<String,String>>() {
+        @Override
+        public Iterator<Entry<String,String>> iterator() {
+          return getSystemConfiguration().iterator();
+        }
+      }));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
   }
 
   public TServerConnection getConnection(TServerInstance server) {

Reply via email to