jenkins-bot has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/370778 )
Change subject: Add new Load Shedding Router ...................................................................... Add new Load Shedding Router Add a new load shedding router than can decide on a query to run based on load information about the local node. This should help to ride out problems with less user visible effect. TODO (maybe later): * Custom stats output about how many times it was triggered. Unfortunately I havn't found any good way to hook into elasticsearch's stats collection yet. * Use the size of some thread pool queue as a trigger condition. Have not investigated how viable this is. * Use latency percentiles (p95, etc) as a trigger condition Change-Id: I0abe91ccf2f628b03621a7f7f0fe634c52c039af --- A docs/degraded_router.md M src/main/java/org/wikimedia/search/extra/ExtraPlugin.java A src/main/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilder.java M src/test/java/org/wikimedia/search/extra/QueryBuilderTestUtils.java A src/test/java/org/wikimedia/search/extra/router/DegradedRouterBuilderESTest.java A src/test/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilderParserTest.java M src/test/java/org/wikimedia/search/extra/router/TokenCountRouterBuilderESTest.java 7 files changed, 445 insertions(+), 3 deletions(-) Approvals: jenkins-bot: Verified DCausse: Looks good to me, approved diff --git a/docs/degraded_router.md b/docs/degraded_router.md new file mode 100644 index 0000000..c727c2f --- /dev/null +++ b/docs/degraded_router.md @@ -0,0 +1,48 @@ +degraded_router +=============== + +The ```degraded_router``` is a simple query wrapper that allows +routing queries based on the load of individual node. It's useful +to prevent overloaded servers from having an outside effect on end +user latency by running a cheaper query when the server is loaded. + +Example +------- + +GET /_search +{ + "degraded_router": { + "fallback": { + "phrase_match": { + "content": "what should we do today?", + } + } + "conditions": [ + { + "gte": 70, + "type": "cpu", + "query": { + "match": { + "content": "what should we do today?" + } + } + } + ] + } +} + +A match query will be issued if system cpu usage is above 70%. Otherwise +a phrase match query will be issued. + +Options +------- + +* `fallback` The query to apply if none of the conditions applies. +* `conditions` Array of conditions (the first that matches wins): + * `type`: The type of metric to compare against. Can be `cpu` for cpu%, or + `load`for 1 minute load average. + * `predicate` : can be `eq`, `gt`, `gte`, `lt`, or `lte`, the value is the number + to compare against the value reported by `type` + * `query` The query to apply if the condition is met. + +Note that the query parser does not check the conditions coherence diff --git a/src/main/java/org/wikimedia/search/extra/ExtraPlugin.java b/src/main/java/org/wikimedia/search/extra/ExtraPlugin.java index 8b69695..229ccaa 100644 --- a/src/main/java/org/wikimedia/search/extra/ExtraPlugin.java +++ b/src/main/java/org/wikimedia/search/extra/ExtraPlugin.java @@ -1,7 +1,9 @@ package org.wikimedia.search.extra; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.indices.analysis.AnalysisModule.AnalysisProvider; +import org.elasticsearch.monitor.os.OsService; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ScriptPlugin; @@ -11,6 +13,7 @@ import org.wikimedia.search.extra.fuzzylike.FuzzyLikeThisQueryBuilder; import org.wikimedia.search.extra.levenshtein.LevenshteinDistanceScoreBuilder; import org.wikimedia.search.extra.regex.SourceRegexQueryBuilder; +import org.wikimedia.search.extra.router.DegradedRouterQueryBuilder; import org.wikimedia.search.extra.router.TokenCountRouterQueryBuilder; import org.wikimedia.search.extra.superdetectnoop.ChangeHandler; import org.wikimedia.search.extra.superdetectnoop.SetHandler; @@ -31,6 +34,14 @@ * Setup the Elasticsearch plugin. */ public class ExtraPlugin extends Plugin implements SearchPlugin, AnalysisPlugin, ScriptPlugin { + + private OsService osService; + + public ExtraPlugin(Settings settings) { + // TODO: This collects way more info than we care about + osService = new OsService(settings); + } + /** * Register our parsers. */ @@ -40,7 +51,8 @@ return Arrays.asList( new QuerySpec<>(SourceRegexQueryBuilder.NAME, SourceRegexQueryBuilder::new, SourceRegexQueryBuilder::fromXContent), new QuerySpec<>(FuzzyLikeThisQueryBuilder.NAME, FuzzyLikeThisQueryBuilder::new, FuzzyLikeThisQueryBuilder::fromXContent), - new QuerySpec<>(TokenCountRouterQueryBuilder.NAME, TokenCountRouterQueryBuilder::new, TokenCountRouterQueryBuilder::fromXContent) + new QuerySpec<>(TokenCountRouterQueryBuilder.NAME, TokenCountRouterQueryBuilder::new, TokenCountRouterQueryBuilder::fromXContent), + new QuerySpec<>(DegradedRouterQueryBuilder.NAME, (in) -> new DegradedRouterQueryBuilder(in, osService), (pc) -> DegradedRouterQueryBuilder.fromXContent(pc, osService)) ); } diff --git a/src/main/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilder.java b/src/main/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilder.java new file mode 100644 index 0000000..9aa0f92 --- /dev/null +++ b/src/main/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilder.java @@ -0,0 +1,172 @@ +package org.wikimedia.search.extra.router; + +import com.google.common.annotations.VisibleForTesting; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.monitor.os.OsService; +import org.elasticsearch.monitor.os.OsStats; + +import org.wikimedia.search.extra.router.AbstractRouterQueryBuilder.Condition; +import org.wikimedia.search.extra.router.DegradedRouterQueryBuilder.DegradedCondition; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** + * Builds a token_count_router query + * + * Getter/Setter are only for testing + */ +@Getter +@Setter +@Accessors(fluent = true, chain = true) +public class DegradedRouterQueryBuilder extends AbstractRouterQueryBuilder<DegradedCondition, DegradedRouterQueryBuilder> { + public static final ParseField NAME = new ParseField("degraded_router"); + private static final ParseField TYPE = new ParseField("type"); + + private final static ObjectParser<DegradedRouterQueryBuilder, QueryParseContext> PARSER; + private final static ObjectParser<DegradedConditionParserState, QueryParseContext> COND_PARSER; + + static { + COND_PARSER = new ObjectParser<>("condition", DegradedConditionParserState::new); + COND_PARSER.declareString((cps, value) -> cps.setType(DegradedConditionType.valueOf(value)), TYPE); + declareConditionFields(COND_PARSER); + + PARSER = new ObjectParser<>(NAME.getPreferredName(), DegradedRouterQueryBuilder::new); + declareStandardFields(PARSER); + declareRouterFields(PARSER, (p, pc) -> parseCondition(COND_PARSER, p, pc)); + } + + // This intentional is not considered in doEquals, as + // it's not part of the definition of the qb but a helper service. + private OsService osService; + + DegradedRouterQueryBuilder() { + super(); + } + + public DegradedRouterQueryBuilder(StreamInput in, OsService osService) throws IOException { + super(in, DegradedCondition::new); + this.osService = osService; + } + + @Override + public String getWriteableName() { + return NAME.getPreferredName(); + } + + public static Optional<DegradedRouterQueryBuilder> fromXContent(QueryParseContext parseContext, OsService osService) throws IOException { + final Optional<DegradedRouterQueryBuilder> builder = AbstractRouterQueryBuilder.fromXContent(PARSER, parseContext); + builder.ifPresent((b) -> b.osService = osService); + return builder; + } + + @Override + public QueryBuilder doRewrite(QueryRewriteContext context) throws IOException { + // The nowInMillis call tells certain implementations of QueryRewriteContext + // that the results of this rewrite are not cacheable. + context.nowInMillis(); + OsStats.Cpu cpu = osService.stats().getCpu(); + return super.doRewrite(condition -> condition.test(cpu)); + } + + @EqualsAndHashCode(callSuper = true) + @Getter + static class DegradedCondition extends Condition { + private final DegradedConditionType type; + + DegradedCondition(StreamInput in) throws IOException { + super(in); + type = DegradedConditionType.readFrom(in); + } + + DegradedCondition(ConditionDefinition definition, DegradedConditionType type, int value, QueryBuilder query) { + super(definition, value, query); + this.type = Objects.requireNonNull(type); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + type.writeTo(out); + } + + public boolean test(OsStats.Cpu cpu) { + return test(type.extract(cpu)); + } + + void addXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TYPE.getPreferredName(), type); + } + } + + @FunctionalInterface + private interface CpuStatExtractor { + int extract(OsStats.Cpu cpu); + } + + enum DegradedConditionType implements CpuStatExtractor, Writeable { + cpu(OsStats.Cpu::getPercent), + load((cpu) -> (int) Math.round(cpu.getLoadAverage()[0])); + + private final CpuStatExtractor extractor; + + DegradedConditionType(CpuStatExtractor extractor) { + this.extractor = extractor; + } + + public int extract(OsStats.Cpu cpu) { + return extractor.extract(cpu); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(ordinal()); + } + + static DegradedConditionType readFrom(StreamInput in) throws IOException { + int ord = in.readVInt(); + if (ord < 0 || ord >= values().length) { + throw new IOException("Unknown ConditionDefinition ordinal [" + ord + "]"); + } + return values()[ord]; + } + } + + private static class DegradedConditionParserState extends AbstractConditionParserState<DegradedCondition> { + private DegradedConditionType type; + + DegradedCondition condition() { + return new DegradedCondition(definition, type, value, query); + } + + void setType(DegradedConditionType type) { + this.type = type; + } + + @Override + void checkValid() throws IllegalArgumentException { + super.checkValid(); + if (type == null) { + throw new IllegalArgumentException("Missing field [type] in condition"); + } + } + } + + @VisibleForTesting + void condition(ConditionDefinition def, DegradedConditionType type, int value, QueryBuilder query) { + condition(new DegradedCondition(def, type, value, query)); + } +} diff --git a/src/test/java/org/wikimedia/search/extra/QueryBuilderTestUtils.java b/src/test/java/org/wikimedia/search/extra/QueryBuilderTestUtils.java index a245944..4ba6137 100644 --- a/src/test/java/org/wikimedia/search/extra/QueryBuilderTestUtils.java +++ b/src/test/java/org/wikimedia/search/extra/QueryBuilderTestUtils.java @@ -24,7 +24,7 @@ private final NamedXContentRegistry xContentRegistry; private QueryBuilderTestUtils() { - SearchModule module = new SearchModule(Settings.EMPTY, false, Collections.singletonList(new ExtraPlugin())); + SearchModule module = new SearchModule(Settings.EMPTY, false, Collections.singletonList(new ExtraPlugin(Settings.EMPTY))); xContentRegistry = new NamedXContentRegistry(module.getNamedXContents()); } diff --git a/src/test/java/org/wikimedia/search/extra/router/DegradedRouterBuilderESTest.java b/src/test/java/org/wikimedia/search/extra/router/DegradedRouterBuilderESTest.java new file mode 100644 index 0000000..6e44839 --- /dev/null +++ b/src/test/java/org/wikimedia/search/extra/router/DegradedRouterBuilderESTest.java @@ -0,0 +1,153 @@ +package org.wikimedia.search.extra.router; + +import org.apache.lucene.index.Term; +import org.apache.lucene.index.memory.MemoryIndex; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.lucene.search.MatchNoDocsQuery; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.*; +import org.elasticsearch.monitor.os.OsService; +import org.elasticsearch.monitor.os.OsStats; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.test.AbstractQueryTestCase; +import org.junit.runner.RunWith; +import org.wikimedia.search.extra.ExtraPlugin; +import org.wikimedia.search.extra.router.AbstractRouterQueryBuilder.ConditionDefinition; +import org.wikimedia.search.extra.router.DegradedRouterQueryBuilder.DegradedConditionType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.wikimedia.search.extra.router.AbstractRouterQueryBuilder.ConditionDefinition.gt; +import static org.wikimedia.search.extra.router.AbstractRouterQueryBuilder.ConditionDefinition.gte; + +@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class) +public class DegradedRouterBuilderESTest extends AbstractQueryTestCase<DegradedRouterQueryBuilder>{ + protected Collection<Class<? extends Plugin>> getPlugins() { + return Collections.singleton(ExtraPlugin.class); + } + + @Override + protected boolean builderGeneratesCacheableQueries() { + return false; + } + + @Override + protected DegradedRouterQueryBuilder doCreateTestQueryBuilder() { + DegradedRouterQueryBuilder builder = newBuilder(); + builder.fallback(new MatchNoneQueryBuilder()); + + for (int i = randomIntBetween(1, 10); i > 0; i--) { + DegradedConditionType type = randomFrom(DegradedConditionType.values()); + ConditionDefinition cond = randomFrom(ConditionDefinition.values()); + int value = randomInt(10); + builder.condition(cond, type, value, new TermQueryBuilder( + type.name() + ":" + cond.name(), String.valueOf(value))); + } + + return builder; + } + + @Override + protected void doAssertLuceneQuery(DegradedRouterQueryBuilder builder, Query query, SearchContext context) throws IOException { + OsStats.Cpu cpu = builder.osService().stats().getCpu(); + + Optional<DegradedRouterQueryBuilder.DegradedCondition> cond = builder.conditionStream() + .filter(x -> x.test(cpu)) + .findFirst(); + + query = rewrite(query); + + if(cond.isPresent()) { + assertThat(query, instanceOf(TermQuery.class)); + TermQuery tq = (TermQuery) query; + String expect = cond.get().type().name() + ":" + cond.get().definition().name(); + assertEquals(new Term(expect, String.valueOf(cond.get().value())), tq.getTerm()); + } else { + assertThat(query, instanceOf(MatchNoDocsQuery.class)); + } + } + + public void testRequiredFields() throws IOException { + final DegradedRouterQueryBuilder builder = new DegradedRouterQueryBuilder(); + assertThat(expectThrows(ParsingException.class, () -> parseQuery(builder)).getMessage(), + containsString("No conditions defined")); + builder.condition(gt, DegradedConditionType.cpu, 1, new MatchNoneQueryBuilder()); + + assertThat(expectThrows(ParsingException.class, () -> parseQuery(builder)).getMessage(), + containsString("No fallback query defined")); + builder.fallback(new MatchNoneQueryBuilder()); + + parseQuery(builder); + } + + @Override + public void testMustRewrite() throws IOException { + DegradedRouterQueryBuilder builder = newBuilder(); + QueryBuilder toRewrite = new TermQueryBuilder("fallback", "fallback"); + builder.fallback(new WrapperQueryBuilder(toRewrite.toString())); + for(int i = randomIntBetween(1,10); i > 0; i--) { + ConditionDefinition cond = randomFrom(ConditionDefinition.values()); + DegradedConditionType type = randomFrom(DegradedConditionType.values()); + int value = randomInt(10); + builder.condition(cond, type, value, new WrapperQueryBuilder(toRewrite.toString())); + } + QueryBuilder rewrittenBuilder = QueryBuilder.rewriteQuery(builder, createShardContext()); + assertEquals(rewrittenBuilder, toRewrite); + } + + private DegradedRouterQueryBuilder newBuilder() { + DegradedRouterQueryBuilder builder = new DegradedRouterQueryBuilder(); + builder.osService(mockOsService()); + return builder; + } + + @Override + protected Query rewrite(Query query) throws IOException { + if (query != null) { + // When rewriting q QueryBuilder with a boost or a name + // we end up with a wrapping bool query. + // see doRewrite + // rewrite as lucene does to have the real inner query + MemoryIndex idx = new MemoryIndex(); + return idx.createSearcher().rewrite(query); + } + return new MatchAllDocsQuery(); // null == *:* + } + + + private OsService mockOsService() { + double loadAvg = randomDoubleBetween(0D, 50D, false); + double[] loadAvgs = {loadAvg, loadAvg, loadAvg}; + OsStats stats = new OsStats(0, + new OsStats.Cpu((short) randomIntBetween(0, 100), loadAvgs), + new OsStats.Mem(0, 0), + new OsStats.Swap(0L, 0L), + new OsStats.Cgroup("", 0L, "", 0L, 0L, + new OsStats.Cgroup.CpuStat(0L, 0L, 0L))); + return new MockOsService(stats); + } + + private class MockOsService extends OsService { + OsStats stats; + + MockOsService(OsStats stats) { + super(Settings.EMPTY); + this.stats = stats; + } + + @Override + public synchronized OsStats stats() { + return stats; + } + } +} diff --git a/src/test/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilderParserTest.java b/src/test/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilderParserTest.java new file mode 100644 index 0000000..9ed8c1d --- /dev/null +++ b/src/test/java/org/wikimedia/search/extra/router/DegradedRouterQueryBuilderParserTest.java @@ -0,0 +1,57 @@ +package org.wikimedia.search.extra.router; + +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.index.query.MatchNoneQueryBuilder; +import org.elasticsearch.index.query.MatchPhraseQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.junit.Test; +import org.wikimedia.search.extra.QueryBuilderTestUtils; +import org.wikimedia.search.extra.router.AbstractRouterQueryBuilder.ConditionDefinition; +import org.wikimedia.search.extra.router.DegradedRouterQueryBuilder.DegradedConditionType; + +import java.io.IOException; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.instanceOf; + +public class DegradedRouterQueryBuilderParserTest extends LuceneTestCase { + @Test + public void testParseExample() throws IOException { + String json = "{\"degraded_router\": {\n" + + " \"conditions\" : [\n" + + " {\n"+ + " \"lt\": 70,\n" + + " \"type\": \"cpu\"," + + " \"query\": {\n" + + " \"match_phrase\": {\n" + + " \"text\": \"input query\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"fallback\": {\n" + + " \"match_none\": {}\n" + + " }\n" + + "}}"; + + Optional<QueryBuilder> optional = QueryBuilderTestUtils.FULLY_FEATURED.parseQuery(json); + assertTrue(optional.isPresent()); + QueryBuilder builder = optional.get(); + assertThat(builder, instanceOf(DegradedRouterQueryBuilder.class)); + DegradedRouterQueryBuilder qb = (DegradedRouterQueryBuilder) builder; + assertNotNull(qb.osService()); + assertEquals(1, qb.conditionStream().count()); + DegradedRouterQueryBuilder.DegradedCondition cond = qb.conditionStream().findFirst().get(); + assertEquals(DegradedConditionType.cpu, cond.type()); + assertThat(cond.query(), instanceOf(MatchPhraseQueryBuilder.class)); + assertEquals(ConditionDefinition.lt, cond.definition()); + assertEquals(70, cond.value()); + assertThat(qb.fallback(),instanceOf(MatchNoneQueryBuilder.class)); + + DegradedRouterQueryBuilder expected = new DegradedRouterQueryBuilder(); + expected.condition(ConditionDefinition.lt, DegradedConditionType.cpu, 70, + new MatchPhraseQueryBuilder("text", "input query")); + expected.fallback(new MatchNoneQueryBuilder()); + assertEquals(expected, qb); + } +} diff --git a/src/test/java/org/wikimedia/search/extra/router/TokenCountRouterBuilderESTest.java b/src/test/java/org/wikimedia/search/extra/router/TokenCountRouterBuilderESTest.java index a34b2f5..b568b42 100644 --- a/src/test/java/org/wikimedia/search/extra/router/TokenCountRouterBuilderESTest.java +++ b/src/test/java/org/wikimedia/search/extra/router/TokenCountRouterBuilderESTest.java @@ -193,7 +193,7 @@ assertThat(t.getMessage(), equalTo("Cannot set extra predicate [gt] on condition: [gte] already set")); } - @Override + @Override public void testMustRewrite() throws IOException { TokenCountRouterQueryBuilder builder = new TokenCountRouterQueryBuilder(); builder.text(randomAlphaOfLength(20)); -- To view, visit https://gerrit.wikimedia.org/r/370778 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0abe91ccf2f628b03621a7f7f0fe634c52c039af Gerrit-PatchSet: 6 Gerrit-Project: search/extra Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> Gerrit-Reviewer: DCausse <dcau...@wikimedia.org> Gerrit-Reviewer: EBernhardson <ebernhard...@wikimedia.org> Gerrit-Reviewer: Gehel <guillaume.leder...@wikimedia.org> Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits