This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ca78729254 Add a flexible json config way (#14229)
ca78729254 is described below
commit ca787292545c1f66ce12b76bfb19c1da5d2616cb
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Nov 22 16:33:26 2024 -0800
Add a flexible json config way (#14229)
* Add a flexible json config way
* fix style
* Fix import
* rerun
* rerun
* Add more comments in UT
* rerun
* rerun
---
.../pinot/spi/config/table/JsonIndexConfig.java | 17 +++
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 134 +++++++++++++++++++--
.../org/apache/pinot/spi/utils/JsonUtilsTest.java | 123 +++++++++++++++++++
3 files changed, 265 insertions(+), 9 deletions(-)
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
index 4dbbf6c07d..27a31b5a03 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
@@ -39,6 +39,11 @@ import javax.annotation.Nullable;
* the excluded paths will also be excluded, e.g. "$.a.b.c"
will be excluded when "$.a.b" is configured
* to be excluded.
* - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is
under the included paths.
+ * - indexPaths: Index the given paths, e.g. "*.*", "a.**". Paths matches the
indexed paths will be indexed.
+ * This config could work together with other configs, e.g.
includePaths, excludePaths, maxLevels but
+ * usually does not have to because it should be flexible enough
to config any scenarios. By default, it
+ * is working as "**" this is to allow everything.
+ * Check {@link org.apache.pinot.spi.utils.JsonSchemaTreeNode}
for more details.
* - maxValueLength: Exclude field values which are longer than this length. A
value of "0" disables this filter.
* Excluded values will be replaced with
JsonUtils.SKIPPED_VALUE_REPLACEMENT.
* - skipInvalidJson: If the raw data is not a valid json string, then replace
with {"":SKIPPED_VALUE_REPLACEMENT}
@@ -54,6 +59,7 @@ public class JsonIndexConfig extends IndexConfig {
private Set<String> _includePaths;
private Set<String> _excludePaths;
private Set<String> _excludeFields;
+ private Set<String> _indexPaths;
private int _maxValueLength = 0;
private boolean _skipInvalidJson = false;
@@ -72,6 +78,7 @@ public class JsonIndexConfig extends IndexConfig {
@JsonProperty("includePaths") @Nullable Set<String> includePaths,
@JsonProperty("excludePaths") @Nullable Set<String> excludePaths,
@JsonProperty("excludeFields") @Nullable Set<String> excludeFields,
+ @JsonProperty("indexPaths") @Nullable Set<String> indexPaths,
@JsonProperty("maxValueLength") int maxValueLength,
@JsonProperty("skipInvalidJson") boolean skipInvalidJson) {
super(disabled);
@@ -81,6 +88,7 @@ public class JsonIndexConfig extends IndexConfig {
_includePaths = includePaths;
_excludePaths = excludePaths;
_excludeFields = excludeFields;
+ _indexPaths = indexPaths;
_maxValueLength = maxValueLength;
_skipInvalidJson = skipInvalidJson;
}
@@ -141,6 +149,15 @@ public class JsonIndexConfig extends IndexConfig {
_excludeFields = excludeFields;
}
+ @Nullable
+ public Set<String> getIndexPaths() {
+ return _indexPaths;
+ }
+
+ public void setIndexPaths(@Nullable Set<String> indexPaths) {
+ _indexPaths = indexPaths;
+ }
+
public int getMaxValueLength() {
return _maxValueLength;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 3429d2ef7d..2dd3bedf39 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -53,8 +53,10 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
@@ -73,6 +75,7 @@ public class JsonUtils {
// For flattening
public static final String VALUE_KEY = "";
public static final String KEY_SEPARATOR = ".";
+ public static final String GLOBAL_WILDCARD = "**"; // represent all the
fields in the current or below levels
public static final String ARRAY_PATH = "[*]";
public static final String ARRAY_INDEX_KEY = ".$index";
public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
@@ -83,6 +86,7 @@ public class JsonUtils {
// For querying
public static final String WILDCARD = "*";
+
// NOTE: Do not expose the ObjectMapper to prevent configuration change
private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
public static final ObjectReader DEFAULT_READER = DEFAULT_MAPPER.reader();
@@ -366,7 +370,7 @@ public class JsonUtils {
*/
protected static List<Map<String, String>> flatten(JsonNode node,
JsonIndexConfig jsonIndexConfig) {
try {
- return flatten(node, jsonIndexConfig, 0, "$", false);
+ return flatten(node, jsonIndexConfig, 0, "$", false,
createTree(jsonIndexConfig));
} catch (OutOfMemoryError oom) {
throw new OutOfMemoryError(
String.format("Flattening JSON node: %s with config: %s requires too
much memory, please adjust the config",
@@ -378,13 +382,9 @@ public class JsonUtils {
}
private static List<Map<String, String>> flatten(JsonNode node,
JsonIndexConfig jsonIndexConfig, int level,
- String path, boolean includePathMatched) {
+ String path, boolean includePathMatched, JsonSchemaTreeNode
indexPathNode) {
// Null
- if (node.isNull()) {
- return Collections.emptyList();
- }
-
- if (node.isMissingNode()) {
+ if (node.isNull() || node.isMissingNode() || indexPathNode == null) {
return Collections.emptyList();
}
@@ -426,7 +426,8 @@ public class JsonUtils {
JsonNode childNode = node.get(i);
String arrayIndexValue = Integer.toString(i);
List<Map<String, String>> childResults =
- flatten(childNode, jsonIndexConfig, level + 1, childPath,
includeResult._includePathMatched);
+ flatten(childNode, jsonIndexConfig, level + 1, childPath,
includeResult._includePathMatched,
+ indexPathNode.getChild(""));
for (Map<String, String> childResult : childResults) {
Map<String, String> result = new TreeMap<>();
for (Map.Entry<String, String> entry : childResult.entrySet()) {
@@ -463,7 +464,8 @@ public class JsonUtils {
}
JsonNode childNode = fieldEntry.getValue();
List<Map<String, String>> childResults =
- flatten(childNode, jsonIndexConfig, level + 1, childPath,
includeResult._includePathMatched);
+ flatten(childNode, jsonIndexConfig, level + 1, childPath,
includeResult._includePathMatched,
+ indexPathNode.getChild(field));
int numChildResults = childResults.size();
// Empty list - skip
@@ -746,4 +748,118 @@ public class JsonUtils {
}
return JsonUtils.flatten(jsonNode, jsonIndexConfig);
}
+
+ /**
+ * Generates the JsonSchemaTreeNode tree from the given json index config
indexPaths to represent which path we
+ * should flatten/index in the json record.
+ * @param jsonIndexConfig
+ * @return the root node of the json index paths tree
+ * @throws IllegalArgumentException
+ */
+ private static JsonSchemaTreeNode createTree(@Nonnull JsonIndexConfig
jsonIndexConfig)
+ throws IllegalArgumentException {
+ Set<String> indexPaths = jsonIndexConfig.getIndexPaths();
+ JsonSchemaTreeNode rootNode = new JsonSchemaTreeNode("");
+ // if no index paths are provided, return a global wildcard node
+ if (indexPaths == null || indexPaths.isEmpty()) {
+ rootNode.getAndCreateChild(GLOBAL_WILDCARD);
+ return rootNode;
+ }
+
+ for (String indexPath : indexPaths) {
+ String[] paths = StringUtils.splitPreserveAllTokens(indexPath,
KEY_SEPARATOR);
+ JsonSchemaTreeNode currentNode = rootNode;
+ for (String key : paths) {
+ currentNode = currentNode.getAndCreateChild(key);
+ if (GLOBAL_WILDCARD.equals(key)) {
+ break;
+ }
+ }
+ }
+
+ return rootNode;
+ }
+}
+
+/**
+ * JsonSchemaTreeNode represents the tree node when we construct the json
schema tree.
+ * This tree is used to represent how we want to flatten/index the json
according to the {@link JsonIndexConfig}.
+ * The node could be either leaf node or non-leaf node. Both types of node
could hold the volume to indicate whether
+ * we should flatten/index the json at this node.
+ * For example, the config with *.*, a.b.*.*, a.b.x, a.b.c.**, a.b.d.*.*,
e.f.g will have the following tree
+ * structure:
+ * root -- * -- *
+ * -- a -- b -- * -- *
+ * -- c -- **
+ * -- d -- * -- *
+ * -- e -- f -- g
+ * The path structure is defined as:
+ * each key is separated by '.'
+ * the key without wildcard represents single value field
+ * the key "*" represents any types of single node
+ * the key "**" represents any types of leaf node OR a subtree with all its
children
+ * When multiple conditions are matched, e.g. a.b.c, it would match with
priority:
+ * 1. **
+ * 2. exact match (either single value or array value)
+ * 3. *
+ */
+class JsonSchemaTreeNode {
+ private Map<String, JsonSchemaTreeNode> _children;
+ private JsonSchemaTreeNode _gloabalWildcardChild;
+ private JsonSchemaTreeNode _wildcardChild;
+ private String _key;
+
+ public JsonSchemaTreeNode(String key) {
+ _key = key;
+ _children = new HashMap<>();
+ }
+
+ /**
+ * If does not have the child node, add a child node to the current node and
return the child node.
+ * If the child node already exists, return the existing child node.
+ * @param key
+ * @return child
+ */
+ public JsonSchemaTreeNode getAndCreateChild(String key) {
+ // if .** is already added, no need to add any child
+ if (_gloabalWildcardChild != null) {
+ return _gloabalWildcardChild;
+ }
+ switch (key) {
+ case JsonUtils.GLOBAL_WILDCARD:
+ if (_gloabalWildcardChild == null) {
+ _gloabalWildcardChild = new JsonSchemaTreeNode(key);
+ }
+ return _gloabalWildcardChild;
+ case JsonUtils.WILDCARD:
+ if (_wildcardChild == null) {
+ _wildcardChild = new JsonSchemaTreeNode(key);
+ }
+ return _wildcardChild;
+ default:
+ JsonSchemaTreeNode child = _children.get(key);
+ if (child == null) {
+ child = new JsonSchemaTreeNode(key);
+ _children.put(key, child);
+ }
+ return child;
+ }
+ }
+
+ @Nullable
+ public JsonSchemaTreeNode getChild(String key) {
+ if (JsonUtils.GLOBAL_WILDCARD.equals(_key)) {
+ return this;
+ }
+ if (_gloabalWildcardChild != null) {
+ return _gloabalWildcardChild;
+ }
+ if (_children.containsKey(key)) {
+ return _children.get(key);
+ }
+ if (_wildcardChild != null) {
+ return _wildcardChild;
+ }
+ return null;
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
index cf542926d3..fafa5f93a2 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/JsonUtilsTest.java
@@ -641,4 +641,127 @@ public class JsonUtilsTest {
List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode,
jsonIndexConfig);
assertTrue(flattenedRecords.isEmpty());
}
+
+ @Test
+ public void testFlattenWithIndexPaths()
+ throws IOException {
+ {
+ /* input json
+ [
+ {
+ "country": "us",
+ "street": "main st",
+ "number": 1
+ },
+ {
+ "country": "ca",
+ "street": "second st"
+ "number": 2
+ }
+ ]
+ */
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(
+ "[{\"country\":\"us\",\"street\":\"main
st\",\"number\":1},{\"country\":\"ca\",\"street\":\"second st\","
+ + "\"number\":2}]");
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode,
jsonIndexConfig);
+
+ // flatten everything within 2 layers
+ jsonIndexConfig.setIndexPaths(Collections.singleton("*.*"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+
+ // flatten "a." prefix till 2 layers
+ jsonIndexConfig.setIndexPaths(Collections.singleton("a.*"));
+ flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ assertTrue(flattenedRecords.isEmpty());
+ }
+ {
+ /* input json
+ {
+ "name": "adam",
+ "addresses": [
+ {
+ "country": "us",
+ "street": "main st",
+ "number": 1
+ },
+ {
+ "country": "ca",
+ "street": "second st"
+ "number": 2
+ }
+ ]
+ }
+ */
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(
+
"{\"name\":\"adam\",\"addresses\":[{\"country\":\"us\",\"street\":\"main
st\",\"number\":1},"
+ + "{\"country\":\"ca\",\"street\":\"second
st\",\"number\":2}]}");
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode,
jsonIndexConfig);
+
+ // flatten everything
+ jsonIndexConfig.setIndexPaths(Collections.singleton("**"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+
+ // flatten everything within 3 layers
+ jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.*"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+
+ // flatten "name" 1 layer and "addresses" infinite layers
+ jsonIndexConfig.setIndexPaths(ImmutableSet.of("name", "addresses.**"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+
+ // flatten everything within 2 layers
+ jsonIndexConfig.setIndexPaths(Collections.singleton("*.*"));
+ flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ assertEquals(flattenedRecords.size(), 1);
+ assertEquals(flattenedRecords.get(0), Collections.singletonMap(".name",
"adam"));
+
+ // flatten "name." prefix with infinite layers
+ jsonIndexConfig.setIndexPaths(Collections.singleton("name.**"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+ }
+ {
+ /* input json
+ {
+ "name": "charles",
+ "addresses": [
+ {
+ "country": "us",
+ "street": "main st",
+ "types": ["home", "office"]
+ },
+ {
+ "country": "ca",
+ "street": "second st"
+ }
+ ]
+ }
+ */
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(
+
"{\"name\":\"charles\",\"addresses\":[{\"country\":\"us\",\"street\":\"main
st\",\"types\":[\"home\","
+ + "\"office\"]}," + "{\"country\":\"ca\",\"street\":\"second
st\"}]}");
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonNode,
jsonIndexConfig);
+
+ // flatten everything
+ jsonIndexConfig.setIndexPaths(Collections.singleton("*.*.**"));
+ assertEquals(JsonUtils.flatten(jsonNode, jsonIndexConfig),
flattenedRecords);
+
+ // flatten addresses array with one more layer
+ jsonIndexConfig.setIndexPaths(Collections.singleton("addresses..*"));
+ flattenedRecords = JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ assertEquals(flattenedRecords.size(), 2);
+ Map<String, String> flattenedRecord0 = flattenedRecords.get(0);
+ assertEquals(flattenedRecord0.size(), 3);
+ assertEquals(flattenedRecord0.get(".addresses.$index"), "0");
+ assertEquals(flattenedRecord0.get(".addresses..country"), "us");
+ assertEquals(flattenedRecord0.get(".addresses..street"), "main st");
+ Map<String, String> flattenedRecord1 = flattenedRecords.get(1);
+ assertEquals(flattenedRecord1.size(), 3);
+ assertEquals(flattenedRecord1.get(".addresses.$index"), "1");
+ assertEquals(flattenedRecord1.get(".addresses..country"), "ca");
+ assertEquals(flattenedRecord1.get(".addresses..street"), "second st");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]