Repository: incubator-rya Updated Branches: refs/heads/develop 990f1ffe2 -> e6be84a40
RYA-12 Adding support for Additional Iterators on Core Tables note: the config file format may change in the future. This is really just a change to the client Config API and Query Engine. Also pulling in an orphan "Manual Flush" commit that did not make it into the repo. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e6be84a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e6be84a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e6be84a4 Branch: refs/heads/develop Commit: e6be84a407e05c66cb4b4b6ef225d7e07dd10fcf Parents: 990f1ff Author: Aaron Mihalik <miha...@alum.mit.edu> Authored: Fri Dec 4 20:25:51 2015 -0500 Committer: Aaron Mihalik <miha...@alum.mit.edu> Committed: Fri Dec 4 20:25:51 2015 -0500 ---------------------------------------------------------------------- .../rya/accumulo/AccumuloRdfConfiguration.java | 74 +++++++++++++++++++- .../java/mvm/rya/accumulo/AccumuloRyaDAO.java | 33 ++++----- .../accumulo/query/AccumuloRyaQueryEngine.java | 6 ++ .../accumulo/AccumuloRdfConfigurationTest.java | 34 ++++++--- .../mvm/rya/accumulo/AccumuloRyaDAOTest.java | 58 +++++++++++++-- .../accumulo/entity/AccumuloDocIndexerTest.java | 3 +- 6 files changed, 173 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java index 147228b..709ceb9 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java @@ -21,11 +21,17 @@ package mvm.rya.accumulo; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import mvm.rya.accumulo.experimental.AccumuloIndexer; import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.conf.Configuration; @@ -42,6 +48,17 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public static final String MAXRANGES_SCANNER = "ac.query.maxranges"; public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; + + public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush"; + + public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size"; + public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d."; + public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name"; + public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass"; + public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority"; + public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize"; + public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name"; + public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value"; public AccumuloRdfConfiguration() { super(); @@ -73,7 +90,7 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) { List<String> strs = Lists.newArrayList(); - for (Class ai : indexers){ + for (Class<? extends AccumuloIndexer> ai : indexers){ strs.add(ai.getName()); } @@ -83,4 +100,59 @@ public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { public List<AccumuloIndexer> getAdditionalIndexers() { return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class); } + public boolean flushEachUpdate(){ + return getBoolean(CONF_FLUSH_EACH_UPDATE, true); + } + + public void setFlush(boolean flush){ + setBoolean(CONF_FLUSH_EACH_UPDATE, flush); + } + + public void setAdditionalIterators(IteratorSetting... additionalIterators){ + //TODO do we need to worry about cleaning up + this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length)); + int i = 0; + for(IteratorSetting iterator : additionalIterators) { + this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName()); + this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass()); + this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority())); + Map<String, String> options = iterator.getOptions(); + + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size())); + Iterator<Entry<String, String>> it = options.entrySet().iterator(); + int j = 0; + while(it.hasNext()) { + Entry<String, String> item = it.next(); + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey()); + this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue()); + j++; + } + i++; + } + } + + public IteratorSetting[] getAdditionalIterators(){ + int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); + if(size == 0) { + return new IteratorSetting[0]; + } + + IteratorSetting[] settings = new IteratorSetting[size]; + for(int i = 0; i < size; i++) { + String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); + String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); + int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); + + int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); + Map<String, String> options = new HashMap<String, String>(optionsSize); + for(int j = 0; j < optionsSize; j++) { + String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); + String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); + options.put(key, value); + } + settings[i] = new IteratorSetting(priority, name, iteratorClass, options); + } + + return settings; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java index 84fae68..8a6bd00 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java @@ -79,15 +79,11 @@ import mvm.rya.api.resolver.RyaTripleContext; import mvm.rya.api.resolver.triple.TripleRow; import mvm.rya.api.resolver.triple.TripleRowResolverException; -/** - * Class AccumuloRyaDAO - * Date: Feb 29, 2012 - * Time: 12:37:22 PM - */ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); private boolean initialized = false; + private boolean flushEachUpdate = true; private Connector connector; private BatchWriterConfig batchWriterConfig; @@ -134,6 +130,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName secondaryIndexers = conf.getAdditionalIndexers(); + flushEachUpdate = conf.flushEachUpdate(); + TableOperations tableOperations = connector.tableOperations(); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); @@ -151,9 +149,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo()); bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); - bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, - MAX_TIME, 1); - + bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs()); + for (AccumuloIndexer index : secondaryIndexers) { index.setMultiTableBatchWriter(mt_bw); } @@ -193,7 +190,6 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName @Override public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { this.delete(Iterators.singletonIterator(stmt), aconf); - //TODO currently all indexers do not support delete } @Override @@ -211,8 +207,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName index.deleteStatement(stmt); } } - mt_bw.flush(); - //TODO currently all indexers do not support delete + if (flushEachUpdate) { mt_bw.flush(); } } catch (Exception e) { throw new RyaDAOException(e); } @@ -299,7 +294,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } } - mt_bw.flush(); + if (flushEachUpdate) { mt_bw.flush(); } } catch (Exception e) { throw new RyaDAOException(e); } @@ -314,10 +309,8 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName try { initialized = false; mt_bw.flush(); - bw_ns.flush(); mt_bw.close(); - bw_ns.close(); } catch (Exception e) { throw new RyaDAOException(e); } @@ -329,7 +322,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName Mutation m = new Mutation(new Text(pfx)); m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); bw_ns.addMutation(m); - bw_ns.flush(); + if (flushEachUpdate) { mt_bw.flush(); } } catch (Exception e) { throw new RyaDAOException(e); } @@ -360,7 +353,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName Mutation del = new Mutation(new Text(pfx)); del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); bw_ns.addMutation(del); - bw_ns.flush(); + if (flushEachUpdate) { mt_bw.flush(); } } catch (Exception e) { throw new RyaDAOException(e); } @@ -464,6 +457,14 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName this.queryEngine = queryEngine; } + public void flush() throws RyaDAOException { + try { + mt_bw.flush(); + } catch (MutationsRejectedException e) { + throw new RyaDAOException(e); + } + } + protected String[] getTables() { // core tables List<String> tableNames = Lists.newArrayList( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java index 1d0d9c9..869a128 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/AccumuloRyaQueryEngine.java @@ -388,6 +388,12 @@ public class AccumuloRyaQueryEngine implements RyaQueryEngine<AccumuloRdfConfigu RegExFilter.setRegexs(setting, regex, null, null, null, false); scanner.addScanIterator(setting); } + if (conf instanceof AccumuloRdfConfiguration) { + //TODO should we take the iterator settings as is or should we adjust the priority based on the above? + for (IteratorSetting itr : ((AccumuloRdfConfiguration)conf).getAdditionalIterators()) { + scanner.addScanIterator(itr); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java index b7c9079..ffd316e 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java @@ -21,20 +21,19 @@ package mvm.rya.accumulo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.security.Authorizations; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Date: 1/28/13 - * Time: 8:36 AM - */ public class AccumuloRdfConfigurationTest { private static final Logger logger = LoggerFactory.getLogger(AccumuloRdfConfigurationTest.class); @@ -56,4 +55,21 @@ public class AccumuloRdfConfigurationTest { assertEquals(str, conf.getAuth()); assertEquals(auths, conf.getAuthorizations()); } + + @Test + public void testIterators() { + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + Map<String, String> options = new HashMap<String, String>(); + options.put("key1", "value1"); + options.put("key2", "value2"); + IteratorSetting setting = new IteratorSetting(1, "test", "test2", options); + + conf.setAdditionalIterators(setting); + IteratorSetting[] iteratorSettings = conf.getAdditionalIterators(); + assertTrue(iteratorSettings.length == 1); + + assertEquals(setting, iteratorSettings[0]); + + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java index ab4528b..5c30e67 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java @@ -21,9 +21,18 @@ package mvm.rya.accumulo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import info.aduna.iteration.CloseableIteration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreUtils; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; @@ -33,10 +42,11 @@ import mvm.rya.api.resolver.RdfToRyaConversions; import mvm.rya.api.resolver.RyaContext; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.mock.MockInstance; -import org.calrissian.mango.collect.CloseableIterable; +import org.apache.accumulo.core.iterators.FirstEntryInRowIterator; import org.calrissian.mango.collect.FluentCloseableIterable; import org.junit.After; import org.junit.Before; @@ -44,11 +54,6 @@ import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.BindingSet; - -import java.util.*; - -import static org.junit.Assert.*; /** * Class AccumuloRdfDAOTest @@ -631,6 +636,45 @@ public class AccumuloRyaDAOTest { assertFalse(dao.isInitialized()); } + @Test + public void testQueryWithIterators() throws Exception { + RyaURI cpu = new RyaURI(litdupsNS + "cpu"); + RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc"); + RyaURI uri1 = new RyaURI(litdupsNS + "uri1"); + dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual1")); + dao.add(new RyaStatement(cpu, loadPerc, uri1, null, "qual2")); + + AccumuloRyaQueryEngine queryEngine = dao.getQueryEngine(); + + AccumuloRdfConfiguration queryConf = new AccumuloRdfConfiguration(conf); + IteratorSetting firstEntryInRow = new IteratorSetting(3 /* correct value?? */, FirstEntryInRowIterator.class); + queryConf.setAdditionalIterators(firstEntryInRow); + + Collection<RyaStatement> coll = new ArrayList<>(); + coll.add(new RyaStatement(null, loadPerc, uri1)); + CloseableIteration<RyaStatement, RyaDAOException> iter = queryEngine.batchQuery(coll, queryConf); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(1, count); + + //Assert that without the iterator we get 2 + coll = new ArrayList<>(); + coll.add(new RyaStatement(null, loadPerc, uri1)); + iter = queryEngine.batchQuery(coll, conf); + count = 0; + while (iter.hasNext()) { + count++; + iter.next(); + } + iter.close(); + assertEquals(2, count); + + } + private boolean areTablesEmpty() throws TableNotFoundException { for (String table : dao.getTables()) { if (tableExists(table)) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e6be84a4/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java index e7e06d9..6237697 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIndexerTest.java @@ -26,14 +26,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import junit.framework.Assert; import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.accumulo.RyaTableMutationsFactory; import mvm.rya.api.RdfCloudTripleStoreConstants; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.domain.RyaType; import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TablePrefixLayoutStrategy; import mvm.rya.api.resolver.RyaToRdfConversions; import mvm.rya.api.resolver.RyaTripleContext; import mvm.rya.indexing.accumulo.ConfigUtils; @@ -43,6 +41,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.openrdf.model.Value;