Jackie-Jiang commented on a change in pull request #6930:
URL: https://github.com/apache/incubator-pinot/pull/6930#discussion_r633937767
##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String,
String>> currentResults,
unnestResults(newCurrentResults, nestedResultsList, index + 1,
nonNestedResult, outputResults);
}
}
+
+ public static Schema getPinotSchemaFromJsonFile(File jsonFile,
Review comment:
I feel this method is not really useful. It requires the whole json file
to be a single object. In regular cases, the file should contain a list of
records
##########
File path:
pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
##########
@@ -93,48 +94,7 @@
//@formatter:off
@Argument(handler = SubCommandHandler.class, metaVar = "<subCommand>")
- @SubCommands({
- @SubCommand(name = "QuickStart", impl = QuickStartCommand.class),
- @SubCommand(name = "OperateClusterConfig", impl =
OperateClusterConfigCommand.class),
- @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
- @SubCommand(name = "LaunchDataIngestionJob", impl =
LaunchDataIngestionJobCommand.class),
- @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
- @SubCommand(name = "ImportData", impl = ImportDataCommand.class),
- @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
- @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
- @SubCommand(name = "StreamAvroIntoKafka", impl =
StreamAvroIntoKafkaCommand.class),
- @SubCommand(name = "StartController", impl =
StartControllerCommand.class),
- @SubCommand(name = "StartBroker", impl = StartBrokerCommand.class),
- @SubCommand(name = "StartServer", impl = StartServerCommand.class),
- @SubCommand(name = "StartMinion", impl = StartMinionCommand.class),
- @SubCommand(name = "StartServiceManager", impl =
StartServiceManagerCommand.class),
- @SubCommand(name = "AddTable", impl = AddTableCommand.class),
- @SubCommand(name = "ChangeTableState", impl = ChangeTableState.class),
- @SubCommand(name = "AddTenant", impl = AddTenantCommand.class),
- @SubCommand(name = "AddSchema", impl = AddSchemaCommand.class),
- @SubCommand(name = "UpdateSchema", impl = AddSchemaCommand.class),
- @SubCommand(name = "UploadSegment", impl = UploadSegmentCommand.class),
- @SubCommand(name = "PostQuery", impl = PostQueryCommand.class),
- @SubCommand(name = "StopProcess", impl = StopProcessCommand.class),
- @SubCommand(name = "DeleteCluster", impl = DeleteClusterCommand.class),
- @SubCommand(name = "ShowClusterInfo", impl =
ShowClusterInfoCommand.class),
- @SubCommand(name = "AvroSchemaToPinotSchema", impl =
AvroSchemaToPinotSchema.class),
- @SubCommand(name = "RebalanceTable", impl = RebalanceTableCommand.class),
- @SubCommand(name = "ChangeNumReplicas", impl =
ChangeNumReplicasCommand.class),
- @SubCommand(name = "ValidateConfig", impl = ValidateConfigCommand.class),
- @SubCommand(name = "VerifySegmentState", impl =
VerifySegmentState.class),
- @SubCommand(name = "ConvertPinotSegment", impl =
PinotSegmentConvertCommand.class),
- @SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
- @SubCommand(name = "VerifyClusterState", impl =
VerifyClusterStateCommand.class),
- @SubCommand(name = "RealtimeProvisioningHelper", impl =
RealtimeProvisioningHelperCommand.class),
- @SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
- @SubCommand(name = "CheckOfflineSegmentIntervals", impl =
OfflineSegmentIntervalCheckerCommand.class),
- @SubCommand(name = "AnonymizeData", impl = AnonymizeDataCommand.class),
- @SubCommand(name = "GitHubEventsQuickStart", impl =
GitHubEventsQuickStartCommand.class),
- @SubCommand(name = "StreamGitHubEvents", impl =
StreamGitHubEventsCommand.class),
- @SubCommand(name = "BootstrapTable", impl = BootstrapTableCommand.class),
- @SubCommand(name = "SegmentProcessorFramework", impl =
SegmentProcessorFrameworkCommand.class)
- })
+ @SubCommands({@SubCommand(name = "QuickStart", impl =
QuickStartCommand.class), @SubCommand(name = "OperateClusterConfig", impl =
OperateClusterConfigCommand.class), @SubCommand(name = "GenerateData", impl =
GenerateDataCommand.class), @SubCommand(name = "LaunchDataIngestionJob", impl =
LaunchDataIngestionJobCommand.class), @SubCommand(name = "CreateSegment", impl
= CreateSegmentCommand.class), @SubCommand(name = "ImportData", impl =
ImportDataCommand.class), @SubCommand(name = "StartZookeeper", impl =
StartZookeeperCommand.class), @SubCommand(name = "StartKafka", impl =
StartKafkaCommand.class), @SubCommand(name = "StreamAvroIntoKafka", impl =
StreamAvroIntoKafkaCommand.class), @SubCommand(name = "StartController", impl =
StartControllerCommand.class), @SubCommand(name = "StartBroker", impl =
StartBrokerCommand.class), @SubCommand(name = "StartServer", impl =
StartServerCommand.class), @SubCommand(name = "StartMinion", impl =
StartMinionCommand.class), @SubCommand(name = "StartSe
rviceManager", impl = StartServiceManagerCommand.class), @SubCommand(name =
"AddTable", impl = AddTableCommand.class), @SubCommand(name =
"ChangeTableState", impl = ChangeTableState.class), @SubCommand(name =
"AddTenant", impl = AddTenantCommand.class), @SubCommand(name = "AddSchema",
impl = AddSchemaCommand.class), @SubCommand(name = "UpdateSchema", impl =
AddSchemaCommand.class), @SubCommand(name = "UploadSegment", impl =
UploadSegmentCommand.class), @SubCommand(name = "PostQuery", impl =
PostQueryCommand.class), @SubCommand(name = "StopProcess", impl =
StopProcessCommand.class), @SubCommand(name = "DeleteCluster", impl =
DeleteClusterCommand.class), @SubCommand(name = "ShowClusterInfo", impl =
ShowClusterInfoCommand.class), @SubCommand(name = "AvroSchemaToPinotSchema",
impl = AvroSchemaToPinotSchema.class), @SubCommand(name = "JsonToPinotSchema",
impl = JsonToPinotSchema.class), @SubCommand(name = "RebalanceTable", impl =
RebalanceTableCommand.class), @SubCommand(name = "ChangeNu
mReplicas", impl = ChangeNumReplicasCommand.class), @SubCommand(name =
"ValidateConfig", impl = ValidateConfigCommand.class), @SubCommand(name =
"VerifySegmentState", impl = VerifySegmentState.class), @SubCommand(name =
"ConvertPinotSegment", impl = PinotSegmentConvertCommand.class),
@SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
@SubCommand(name = "VerifyClusterState", impl =
VerifyClusterStateCommand.class), @SubCommand(name =
"RealtimeProvisioningHelper", impl = RealtimeProvisioningHelperCommand.class),
@SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
@SubCommand(name = "CheckOfflineSegmentIntervals", impl =
OfflineSegmentIntervalCheckerCommand.class), @SubCommand(name =
"AnonymizeData", impl = AnonymizeDataCommand.class), @SubCommand(name =
"GitHubEventsQuickStart", impl = GitHubEventsQuickStartCommand.class),
@SubCommand(name = "StreamGitHubEvents", impl =
StreamGitHubEventsCommand.class), @SubCommand(name = "BootstrapTable", impl =
BootstrapTableCommand.class), @SubCommand(name = "SegmentProcessorFramework",
impl = SegmentProcessorFrameworkCommand.class)})
Review comment:
Revert (the formatter should already be turned off by the annotation)
##########
File path:
pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/JsonToPinotSchema.java
##########
@@ -0,0 +1,143 @@
+package org.apache.pinot.tools.admin.command;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class for command to infer pinot schema from Json data. Given that it is
not always possible to
+ * automatically do this, the intention is to get most of the work done by
this class, and require any
+ * manual editing on top.
+ */
+public class JsonToPinotSchema extends AbstractBaseAdminCommand implements
Command {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JsonToPinotSchema.class);
+
+ @Option(name = "-jsonFile", required = true, metaVar = "<String>", usage =
"Path to json file.")
+ String _jsonFile;
+
+ @Option(name = "-outputDir", required = true, metaVar = "<string>", usage =
"Path to output directory")
+ String _outputDir;
+
+ @Option(name = "-pinotSchemaName", required = true, metaVar = "<string>",
usage = "Pinot schema name")
+ String _pinotSchemaName;
+
+ @Option(name = "-dimensions", metaVar = "<string>", usage = "Comma separated
dimension column names.")
+ String _dimensions;
+
+ @Option(name = "-metrics", metaVar = "<string>", usage = "Comma separated
metric column names.")
+ String _metrics;
+
+ @Option(name = "-timeColumnName", metaVar = "<string>", usage = "Name of the
time column.")
Review comment:
We already deprecated the TIME field, change it to `-dateTimeColumns`?
##########
File path: pinot-spi/src/test/resources/json_util_test.json
##########
@@ -0,0 +1,25 @@
+{
+ "entries": [
+ {
+ "id": 1234,
+ "description": "entry1"
+ },
+ {
+ "id": 1235,
+ "description": "entry2"
+ }
+ ],
+ "tuple": {
+ "address": {
+ "streetaddress": "1st Ave",
+ "city": "Palo Alto"
+ }
+ },
+ "d2": [
+ 1,
+ 2
+ ],
+ "d1": "dim1",
+ "hoursSinceEpoch": 1621286582,
+ "m1": 12
+}
Review comment:
New line
##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String,
String>> currentResults,
unnestResults(newCurrentResults, nestedResultsList, index + 1,
nonNestedResult, outputResults);
}
}
+
+ public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
Review comment:
unnestFields should also be nullable
##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String,
String>> currentResults,
unnestResults(newCurrentResults, nestedResultsList, index + 1,
nonNestedResult, outputResults);
}
}
+
+ public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter)
+ throws IOException {
+ JsonNode jsonNode = fileToJsonNode(jsonFile);
+ Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an
object");
+ return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit,
unnestFields, delimiter);
+ }
+
+ public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter) {
+ Schema pinotSchema = new Schema();
+ Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+ while (fieldIterator.hasNext()) {
+ Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+ JsonNode childNode = fieldEntry.getValue();
+ inferPinotSchemaFromJsonNode(childNode, pinotSchema,
fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+ delimiter);
+ }
+ return pinotSchema;
+ }
+
+ private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema
pinotSchema, String path,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter) {
+ if (jsonNode.isNull()) {
+ // do nothing
+ return;
Review comment:
Should we throw exception when schema cannot be inferred?
##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
##########
@@ -380,4 +389,120 @@ private static void unnestResults(List<Map<String,
String>> currentResults,
unnestResults(newCurrentResults, nestedResultsList, index + 1,
nonNestedResult, outputResults);
}
}
+
+ public static Schema getPinotSchemaFromJsonFile(File jsonFile,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter)
+ throws IOException {
+ JsonNode jsonNode = fileToJsonNode(jsonFile);
+ Preconditions.checkState(jsonNode.isObject(), "the JSON data shall be an
object");
+ return getPinotSchemaFromJsonNode(jsonNode, fieldTypeMap, timeUnit,
unnestFields, delimiter);
+ }
+
+ public static Schema getPinotSchemaFromJsonNode(JsonNode jsonNode,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter) {
+ Schema pinotSchema = new Schema();
+ Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+ while (fieldIterator.hasNext()) {
+ Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+ JsonNode childNode = fieldEntry.getValue();
+ inferPinotSchemaFromJsonNode(childNode, pinotSchema,
fieldEntry.getKey(), fieldTypeMap, timeUnit, unnestFields,
+ delimiter);
+ }
+ return pinotSchema;
+ }
+
+ private static void inferPinotSchemaFromJsonNode(JsonNode jsonNode, Schema
pinotSchema, String path,
+ @Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable
TimeUnit timeUnit, List<String> unnestFields,
+ String delimiter) {
+ if (jsonNode.isNull()) {
+ // do nothing
+ return;
+ } else if (jsonNode.isValueNode()) {
+ DataType dataType = valueOf(jsonNode);
+ addFieldToPinotSchema(pinotSchema, dataType, path, true, fieldTypeMap,
timeUnit);
+ } else if (jsonNode.isArray()) {
+ int numChildren = jsonNode.size();
+ if (numChildren == 0) {
+ // do nothing
+ return;
+ }
+ JsonNode childNode = jsonNode.get(0);
+
+ if (unnestFields.contains(path)) {
+ inferPinotSchemaFromJsonNode(childNode, pinotSchema, path,
fieldTypeMap, timeUnit, unnestFields, delimiter);
+ } else if (childNode.isValueNode()) {
+ addFieldToPinotSchema(pinotSchema, valueOf(childNode), path, false,
fieldTypeMap, timeUnit);
+ } else {
+ addFieldToPinotSchema(pinotSchema, DataType.STRING, path, true,
fieldTypeMap, timeUnit);
+ }
+ } else if (jsonNode.isObject()) {
+ Iterator<Map.Entry<String, JsonNode>> fieldIterator = jsonNode.fields();
+ while (fieldIterator.hasNext()) {
+ Map.Entry<String, JsonNode> fieldEntry = fieldIterator.next();
+ JsonNode childNode = fieldEntry.getValue();
+ inferPinotSchemaFromJsonNode(childNode, pinotSchema,
String.join(delimiter, path, fieldEntry.getKey()),
+ fieldTypeMap, timeUnit, unnestFields, delimiter);
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Unsupported json node
type", jsonNode.getClass()));
+ }
+ }
+
+ /**
+ * Returns the data type stored in Pinot that is associated with the given
Avro type.
+ */
+ public static DataType valueOf(JsonNode jsonNode) {
+ if (jsonNode.isInt()) {
+ return DataType.INT;
+ } else if (jsonNode.isLong()) {
+ return DataType.LONG;
+ } else if (jsonNode.isFloat()) {
+ return DataType.FLOAT;
+ } else if (jsonNode.isDouble()) {
+ return DataType.DOUBLE;
+ } else if (jsonNode.isBoolean()) {
+ return DataType.BOOLEAN;
+ } else if (jsonNode.isBinary()) {
+ return DataType.BYTES;
+ } else {
+ return DataType.STRING;
+ }
+ }
+
+ private static void addFieldToPinotSchema(Schema pinotSchema, DataType
dataType, String name,
+ boolean isSingleValueField, @Nullable Map<String, FieldSpec.FieldType>
fieldTypeMap,
+ @Nullable TimeUnit timeUnit) {
+ if (fieldTypeMap == null) {
+ pinotSchema.addField(new DimensionFieldSpec(name, dataType,
isSingleValueField));
+ } else {
+ FieldSpec.FieldType fieldType =
+ fieldTypeMap.containsKey(name) ? fieldTypeMap.get(name) :
FieldSpec.FieldType.DIMENSION;
+ Preconditions.checkNotNull(fieldType, "Field type not specified for
field: %s", name);
+ switch (fieldType) {
+ case DIMENSION:
+ pinotSchema.addField(new DimensionFieldSpec(name, dataType,
isSingleValueField));
+ break;
+ case METRIC:
+ Preconditions.checkState(isSingleValueField, "Metric field: %s
cannot be multi-valued", name);
+ pinotSchema.addField(new MetricFieldSpec(name, dataType));
+ break;
+ case TIME:
Review comment:
We already deprecated TIME field. Let's only support `DATE_TIME` here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]