This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 8692086eab2 SOLR-18198 Add support for "missing" stats in rollup for
streaming expressions (#4286)
8692086eab2 is described below
commit 8692086eab2eaffcea5339cc001eb016920c8a5f
Author: Khush Jain <[email protected]>
AuthorDate: Sat May 2 07:52:03 2026 -0400
SOLR-18198 Add support for "missing" stats in rollup for streaming
expressions (#4286)
Co-authored-by: Eric Pugh <[email protected]>
---
...18198-support-missing-stats-count-in-rollup.yml | 8 ++
.../pages/stream-decorator-reference.adoc | 5 +-
.../java/org/apache/solr/client/solrj/io/Lang.java | 2 +
.../solrj/io/stream/ParallelMetricsRollup.java | 5 ++
.../solrj/io/stream/metrics/MissingMetric.java | 87 ++++++++++++++++++++++
.../org/apache/solr/client/solrj/io/TestLang.java | 3 +-
.../solr/client/solrj/io/stream/StreamingTest.java | 81 +++++++++++++++++---
7 files changed, 176 insertions(+), 15 deletions(-)
diff --git
a/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml
b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml
new file mode 100644
index 00000000000..f614517e368
--- /dev/null
+++ b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml
@@ -0,0 +1,8 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: "Support 'missing' stats count in rollup function for streaming
expressions"
+type: added
+authors:
+ - name: khushjain
+links:
+ - name: SOLR-18198
+ url: https://issues.apache.org/jira/browse/SOLR-18198
diff --git
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
index 03740f8f1d9..7f14117ef1d 100644
---
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
+++
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
@@ -1448,7 +1448,7 @@ For faster aggregation over low to moderate cardinality
fields, the `facet` func
* `StreamExpression` (Mandatory)
* `over`: (Mandatory) A list of fields to group by.
* `metrics`: (Mandatory) The list of metrics to compute.
-Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`,
`max(col)`, `count(*)`.
+Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`,
`max(col)`, `count(*)`, `missing(col)`.
=== rollup Syntax
@@ -1465,7 +1465,8 @@ rollup(
max(a_f),
avg(a_i),
avg(a_f),
- count(*)
+ count(*),
+ missing(a_i)
)
----
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java
index 927fb1eef5d..2c0e3a4de7a 100644
--- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java
+++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java
@@ -325,6 +325,7 @@ import
org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric;
import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
import org.apache.solr.client.solrj.io.stream.metrics.StdMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
@@ -406,6 +407,7 @@ public class Lang {
.withFunctionName("std", StdMetric.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("countDist", CountDistinctMetric.class)
+ .withFunctionName("missing", MissingMetric.class)
// tuple manipulation operations
.withFunctionName("replace", ReplaceOperation.class)
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
index 752581b2f8f..584932c1e01 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java
@@ -26,6 +26,7 @@ import
org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.io.stream.metrics.WeightedSumMetric;
@@ -133,6 +134,10 @@ public interface ParallelMetricsRollup {
// can't properly rollup mean metrics w/o a count (reqd by
WeightedSumMetric)
return Optional.empty();
}
+ } else if (next instanceof MissingMetric) {
+ // sum of missing counts
+ nextRollup = new SumMetric(next.getIdentifier());
+ nextRollup.outputLong = next.outputLong;
} else if (next instanceof CountDistinctMetric) {
// rollup of count distinct is the max across the tiers
nextRollup = new MaxMetric(next.getIdentifier());
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java
new file mode 100644
index 00000000000..8f60dfc5f3b
--- /dev/null
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream.metrics;
+
+import java.io.IOException;
+import java.util.Locale;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+
+public class MissingMetric extends Metric {
+ private String columnName;
+ private long count;
+
+ public MissingMetric(String columnName) {
+ init("missing", columnName);
+ }
+
+ public MissingMetric(StreamExpression expression, StreamFactory factory)
throws IOException {
+ String functionName = expression.getFunctionName();
+ String columnName = factory.getValueOperand(expression, 0);
+
+ if (null == columnName) {
+ throw new IOException(
+ String.format(
+ Locale.ROOT,
+ "Invalid expression %s - expected %s(columnName)",
+ expression,
+ functionName));
+ }
+ if (1 != expression.getParameters().size()) {
+ throw new IOException(
+ String.format(Locale.ROOT, "Invalid expression %s - unknown operands
found", expression));
+ }
+
+ init(functionName, columnName);
+ }
+
+ private void init(String functionName, String columnName) {
+ this.columnName = columnName;
+ this.outputLong = true;
+ setFunctionName(functionName);
+ setIdentifier(functionName, "(", columnName, ")");
+ }
+
+ @Override
+ public String[] getColumns() {
+ return new String[] {columnName};
+ }
+
+ @Override
+ public void update(Tuple tuple) {
+ if (tuple.get(columnName) == null) {
+ ++count;
+ }
+ }
+
+ @Override
+ public Long getValue() {
+ return count;
+ }
+
+ @Override
+ public Metric newInstance() {
+ return new MissingMetric(columnName);
+ }
+
+ @Override
+ public StreamExpressionParameter toExpression(StreamFactory factory) throws
IOException {
+ return new StreamExpression(getFunctionName()).withParameter(columnName);
+ }
+}
diff --git
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/TestLang.java
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/TestLang.java
index 3a6f58580ef..9afb1fabc3f 100644
---
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/TestLang.java
+++
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/TestLang.java
@@ -351,7 +351,8 @@ public class TestLang extends SolrTestCase {
"std",
"drill",
"input",
- "countDist"
+ "countDist",
+ "missing"
};
@Test
diff --git
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index a0b72d8d2a1..601a4fe6f67 100644
---
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -46,6 +46,7 @@ import
org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -145,16 +146,16 @@ public class StreamingTest extends SolrCloudTestCase {
// Update request shared by many of the tests
private final UpdateRequest helloDocsUpdateRequest =
new UpdateRequest()
- .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
+ .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "b_f", "1.5")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
- .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
+ .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "b_f", "4.5")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
- .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
- .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
+ .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6", "b_f", "6.5")
+ .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7", "b_f", "7.5")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
- .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
+ .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "b_f",
"10.5");
@Before
public void clearCollection() throws Exception {
@@ -1646,7 +1647,7 @@ public class StreamingTest extends SolrCloudTestCase {
streamContext.setSolrClientCache(solrClientCache);
try {
- SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort",
"a_s asc");
+ SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,b_f",
"sort", "a_s asc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS,
sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
@@ -1660,7 +1661,8 @@ public class StreamingTest extends SolrCloudTestCase {
new MaxMetric("a_f"),
new MeanMetric("a_i"),
new MeanMetric("a_f"),
- new CountMetric()
+ new CountMetric(),
+ new MissingMetric("b_f")
};
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
@@ -1682,6 +1684,7 @@ public class StreamingTest extends SolrCloudTestCase {
Double avgi = tuple.getDouble("avg(a_i)");
Double avgf = tuple.getDouble("avg(a_f)");
Double count = tuple.getDouble("count(*)");
+ Double missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello0", bucket);
assertEquals(17, sumi, 0.001);
@@ -1693,6 +1696,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(4.25, avgi, 0.001);
assertEquals(4.5, avgf, 0.001);
assertEquals(4, count, 0.001);
+ assertEquals(2, missingBf, 0.001);
tuple = tuples.get(1);
bucket = tuple.getString("a_s");
@@ -1705,6 +1709,7 @@ public class StreamingTest extends SolrCloudTestCase {
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
+ missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello3", bucket);
assertEquals(38, sumi, 0.001);
@@ -1716,6 +1721,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(9.5, avgi, 0.001);
assertEquals(6.5, avgf, 0.001);
assertEquals(4, count, 0.001);
+ assertEquals(3, missingBf, 0.001);
tuple = tuples.get(2);
bucket = tuple.getString("a_s");
@@ -1728,6 +1734,7 @@ public class StreamingTest extends SolrCloudTestCase {
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
+ missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello4", bucket);
assertEquals(15, sumi.longValue());
@@ -1739,6 +1746,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(7.5, avgi, 0.01);
assertEquals(5.5, avgf, 0.01);
assertEquals(2, count, 0.01);
+ assertEquals(0, missingBf, 0.01);
// Test will null metrics
rollupStream = new RollupStream(stream, buckets, metrics);
@@ -1763,7 +1771,7 @@ public class StreamingTest extends SolrCloudTestCase {
.add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
- sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc",
"qt", "/export");
+ sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,b_f", "sort", "a_s
asc", "qt", "/export");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets1 = {new Bucket("a_s")};
@@ -1776,7 +1784,8 @@ public class StreamingTest extends SolrCloudTestCase {
new MaxMetric("a_f"),
new MeanMetric("a_i"),
new MeanMetric("a_f"),
- new CountMetric()
+ new CountMetric(),
+ new MissingMetric("b_f")
};
rollupStream = new RollupStream(stream, buckets1, metrics1);
@@ -1796,6 +1805,7 @@ public class StreamingTest extends SolrCloudTestCase {
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
+ missingBf = tuple.getDouble("missing(b_f)");
assertEquals(14, sumi, 0.01);
assertEquals(10, sumf, 0.01);
@@ -1806,6 +1816,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(14, avgi, 0.01);
assertEquals(10, avgf, 0.01);
assertEquals(1, count, 0.01);
+ assertEquals(1, missingBf, 0.01);
} finally {
solrClientCache.close();
}
@@ -1956,12 +1967,51 @@ public class StreamingTest extends SolrCloudTestCase {
"expr",
"rollup(search("
+ COLLECTIONORALIAS
- + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s
desc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\")\n");
+ + ",q=\"*:*\",fl=\"a_s,a_i,a_f,b_f\",sort=\"a_s
asc\",partitionKeys=\"a_s\",
qt=\"/export\"),over=\"a_s\",sum(a_i),sum(a_f),min(a_i),min(a_f),max(a_i),max(a_f),avg(a_i),avg(a_f),count(*),missing(b_f))\n");
SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
streamContext = new StreamContext();
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assertEquals(3, tuples.size());
+
+ Tuple exprTuple = tuples.get(0);
+ assertEquals("hello0", exprTuple.getString("a_s"));
+ assertEquals(17, exprTuple.getDouble("sum(a_i)"), 0.001);
+ assertEquals(18, exprTuple.getDouble("sum(a_f)"), 0.001);
+ assertEquals(0, exprTuple.getDouble("min(a_i)"), 0.001);
+ assertEquals(1, exprTuple.getDouble("min(a_f)"), 0.001);
+ assertEquals(14, exprTuple.getDouble("max(a_i)"), 0.001);
+ assertEquals(10, exprTuple.getDouble("max(a_f)"), 0.001);
+ assertEquals(4.25, exprTuple.getDouble("avg(a_i)"), 0.001);
+ assertEquals(4.5, exprTuple.getDouble("avg(a_f)"), 0.001);
+ assertEquals(4, exprTuple.getDouble("count(*)"), 0.001);
+ assertEquals(2, exprTuple.getDouble("missing(b_f)"), 0.001);
+
+ exprTuple = tuples.get(1);
+ assertEquals("hello3", exprTuple.getString("a_s"));
+ assertEquals(38, exprTuple.getDouble("sum(a_i)"), 0.001);
+ assertEquals(26, exprTuple.getDouble("sum(a_f)"), 0.001);
+ assertEquals(3, exprTuple.getDouble("min(a_i)"), 0.001);
+ assertEquals(3, exprTuple.getDouble("min(a_f)"), 0.001);
+ assertEquals(13, exprTuple.getDouble("max(a_i)"), 0.001);
+ assertEquals(9, exprTuple.getDouble("max(a_f)"), 0.001);
+ assertEquals(9.5, exprTuple.getDouble("avg(a_i)"), 0.001);
+ assertEquals(6.5, exprTuple.getDouble("avg(a_f)"), 0.001);
+ assertEquals(4, exprTuple.getDouble("count(*)"), 0.001);
+ assertEquals(3, exprTuple.getDouble("missing(b_f)"), 0.001);
+
+ exprTuple = tuples.get(2);
+ assertEquals("hello4", exprTuple.getString("a_s"));
+ assertEquals(15, exprTuple.getDouble("sum(a_i)"), 0.001);
+ assertEquals(11, exprTuple.getDouble("sum(a_f)"), 0.001);
+ assertEquals(4, exprTuple.getDouble("min(a_i)"), 0.001);
+ assertEquals(4, exprTuple.getDouble("min(a_f)"), 0.001);
+ assertEquals(11, exprTuple.getDouble("max(a_i)"), 0.001);
+ assertEquals(7, exprTuple.getDouble("max(a_f)"), 0.001);
+ assertEquals(7.5, exprTuple.getDouble("avg(a_i)"), 0.001);
+ assertEquals(5.5, exprTuple.getDouble("avg(a_f)"), 0.001);
+ assertEquals(2, exprTuple.getDouble("count(*)"), 0.001);
+ assertEquals(0, exprTuple.getDouble("missing(b_f)"), 0.001);
} finally {
solrClientCache.close();
}
@@ -1982,7 +2032,7 @@ public class StreamingTest extends SolrCloudTestCase {
"q",
"*:*",
"fl",
- "a_s,a_i,a_f",
+ "a_s,a_i,a_f,b_f",
"sort",
"a_s asc",
"partitionKeys",
@@ -2002,7 +2052,8 @@ public class StreamingTest extends SolrCloudTestCase {
new MaxMetric("a_f"),
new MeanMetric("a_i"),
new MeanMetric("a_f"),
- new CountMetric()
+ new CountMetric(),
+ new MissingMetric("b_f")
};
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
@@ -2027,6 +2078,7 @@ public class StreamingTest extends SolrCloudTestCase {
Double avgi = tuple.getDouble("avg(a_i)");
Double avgf = tuple.getDouble("avg(a_f)");
Double count = tuple.getDouble("count(*)");
+ Double missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello0", bucket);
assertEquals(17, sumi, 0.001);
@@ -2038,6 +2090,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(4.25, avgi, 0.001);
assertEquals(4.5, avgf, 0.001);
assertEquals(4, count, 0.001);
+ assertEquals(2, missingBf, 0.001);
tuple = tuples.get(1);
bucket = tuple.getString("a_s");
@@ -2050,6 +2103,7 @@ public class StreamingTest extends SolrCloudTestCase {
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
+ missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello3", bucket);
assertEquals(38, sumi, 0.001);
@@ -2061,6 +2115,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(9.5, avgi, 0.001);
assertEquals(6.5, avgf, 0.001);
assertEquals(4, count, 0.001);
+ assertEquals(3, missingBf, 0.001);
tuple = tuples.get(2);
bucket = tuple.getString("a_s");
@@ -2073,6 +2128,7 @@ public class StreamingTest extends SolrCloudTestCase {
avgi = tuple.getDouble("avg(a_i)");
avgf = tuple.getDouble("avg(a_f)");
count = tuple.getDouble("count(*)");
+ missingBf = tuple.getDouble("missing(b_f)");
assertEquals("hello4", bucket);
assertEquals(15, sumi.longValue());
@@ -2084,6 +2140,7 @@ public class StreamingTest extends SolrCloudTestCase {
assertEquals(7.5, avgi, 0.001);
assertEquals(5.5, avgf, 0.001);
assertEquals(2, count, 0.001);
+ assertEquals(0, missingBf, 0.001);
} finally {
solrClientCache.close();
}