walterddr commented on code in PR #10673:
URL: https://github.com/apache/pinot/pull/10673#discussion_r1175825406
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -45,108 +43,100 @@ private QueryPlanSerDeUtils() {
public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
DistributedStagePlan distributedStagePlan = new
DistributedStagePlan(stagePlan.getStageId());
-
distributedStagePlan.setServer(stringToInstance(stagePlan.getInstanceId()));
+ distributedStagePlan.setServer(stringToAddress(stagePlan.getInstanceId()));
distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
- Map<Integer, Worker.StageMetadata> metadataMap =
stagePlan.getStageMetadataMap();
-
distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
+
distributedStagePlan.getMetadataMap().putAll(protoMapToMetadataMap(stagePlan.getWorkerMetadataMap()));
return distributedStagePlan;
}
public static Worker.StagePlan serialize(DistributedStagePlan
distributedStagePlan) {
return Worker.StagePlan.newBuilder()
.setStageId(distributedStagePlan.getStageId())
- .setInstanceId(instanceToString(distributedStagePlan.getServer()))
+ .setInstanceId(addressToString(distributedStagePlan.getServer()))
.setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode)
distributedStagePlan.getStageRoot()))
-
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
+
.putAllWorkerMetadata(metadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
}
private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
-
"(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)\\((?<grpc>[0-9]+):(?<service>[0-9]+):(?<mailbox>[0-9]+)\\)");
+ "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
- public static VirtualServer stringToInstance(String serverInstanceString) {
- Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(serverInstanceString);
+ public static VirtualServerAddress stringToAddress(String
serverAddressString) {
+ Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(serverAddressString);
if (!matcher.matches()) {
- throw new IllegalArgumentException("Unexpected serverInstanceString '" +
serverInstanceString + "'. This might "
+ throw new IllegalArgumentException("Unexpected serverAddressString '" +
serverAddressString + "'. This might "
+ "happen if you are upgrading from an old version of the multistage
engine to the current one in a rolling "
+ "fashion.");
}
// Skipped netty and grpc port as they are not used in worker instance.
- return new VirtualServer(new WorkerInstance(matcher.group("host"),
Integer.parseInt(matcher.group("port")),
- Integer.parseInt(matcher.group("grpc")),
Integer.parseInt(matcher.group("service")),
- Integer.parseInt(matcher.group("mailbox"))),
Integer.parseInt(matcher.group("virtualid")));
+ return new VirtualServerAddress(matcher.group("host"),
+ Integer.parseInt(matcher.group("port")),
Integer.parseInt(matcher.group("virtualid")));
}
- public static String instanceToString(VirtualServer serverInstance) {
- return String.format("%s@%s:%s(%s:%s:%s)", serverInstance.getVirtualId(),
serverInstance.getHostname(),
- serverInstance.getPort(), serverInstance.getGrpcPort(),
serverInstance.getQueryServicePort(),
- serverInstance.getQueryMailboxPort());
+ public static String addressToString(VirtualServerAddress serverAddress) {
+ return String.format("%s@%s:%s", serverAddress.workerId(),
serverAddress.hostname(), serverAddress.port());
}
- public static Map<Integer, StageMetadata>
protoMapToStageMetadataMap(Map<Integer, Worker.StageMetadata> protoMap) {
- Map<Integer, StageMetadata> metadataMap = new HashMap<>();
- for (Map.Entry<Integer, Worker.StageMetadata> e : protoMap.entrySet()) {
- metadataMap.put(e.getKey(), fromWorkerStageMetadata(e.getValue()));
+ public static Map<Integer, WorkerMetadata>
protoMapToMetadataMap(Map<Integer, Worker.WorkerMetadata> protoMap) {
+ Map<Integer, WorkerMetadata> metadataMap = new HashMap<>();
+ for (Map.Entry<Integer, Worker.WorkerMetadata> e : protoMap.entrySet()) {
+ metadataMap.put(e.getKey(), fromProtoWorkerMetadata(e.getValue()));
}
return metadataMap;
}
- private static StageMetadata fromWorkerStageMetadata(Worker.StageMetadata
workerStageMetadata) {
- StageMetadata stageMetadata = new StageMetadata();
+ private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata
protoWorkerMetadata) {
+ WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
+
builder.setQueryServerAddress(stringToAddress(protoWorkerMetadata.getInstances()));
// scanned table
-
stageMetadata.getScannedTables().addAll(workerStageMetadata.getDataSourcesList());
- // server instance to table-segments mapping
- for (String serverInstanceString : workerStageMetadata.getInstancesList())
{
-
stageMetadata.getServerInstances().add(stringToInstance(serverInstanceString));
+ if (!protoWorkerMetadata.getDataSources().isEmpty()) {
+ builder.setTableName(protoWorkerMetadata.getDataSources());
}
- for (Map.Entry<String, Worker.SegmentMap> instanceEntry
- : workerStageMetadata.getInstanceToSegmentMapMap().entrySet()) {
- Map<String, List<String>> tableToSegmentMap = new HashMap<>();
- for (Map.Entry<String, Worker.SegmentList> tableEntry
- :
instanceEntry.getValue().getTableTypeToSegmentListMap().entrySet()) {
- tableToSegmentMap.put(tableEntry.getKey(),
tableEntry.getValue().getSegmentsList());
+ // server instance to table-segments mapping
+ if (!protoWorkerMetadata.getInstanceToSegmentMap().isEmpty()) {
+ Map<String, List<String>> tableSegmentsMap = new HashMap<>();
+ for (Map.Entry<String, Worker.SegmentList> entry :
protoWorkerMetadata.getInstanceToSegmentMap().entrySet()) {
+ tableSegmentsMap.put(entry.getKey(),
entry.getValue().getSegmentsList());
}
- stageMetadata.getServerInstanceToSegmentsMap()
- .put(stringToInstance(instanceEntry.getKey()).getServer(),
tableToSegmentMap);
+ builder.setTableSegmentsMap(tableSegmentsMap);
}
// time boundary info
- if (!workerStageMetadata.getTimeColumn().isEmpty()) {
- stageMetadata.setTimeBoundaryInfo(new
TimeBoundaryInfo(workerStageMetadata.getTimeColumn(),
- workerStageMetadata.getTimeValue()));
+ if (!protoWorkerMetadata.getTimeColumn().isEmpty()) {
+ builder.setTimeBoundaryInfo(new
TimeBoundaryInfo(protoWorkerMetadata.getTimeColumn(),
+ protoWorkerMetadata.getTimeValue()));
}
- return stageMetadata;
+ return builder.build();
}
- public static Map<Integer, Worker.StageMetadata>
stageMetadataMapToProtoMap(Map<Integer, StageMetadata> metadataMap) {
- Map<Integer, Worker.StageMetadata> protoMap = new HashMap<>();
- for (Map.Entry<Integer, StageMetadata> e : metadataMap.entrySet()) {
+ public static Map<Integer, Worker.WorkerMetadata>
metadataMapToProtoMap(Map<Integer, WorkerMetadata> metadataMap) {
+ Map<Integer, Worker.WorkerMetadata> protoMap = new HashMap<>();
+ for (Map.Entry<Integer, WorkerMetadata> e : metadataMap.entrySet()) {
protoMap.put(e.getKey(), toWorkerStageMetadata(e.getValue()));
}
return protoMap;
}
- private static Worker.StageMetadata toWorkerStageMetadata(StageMetadata
stageMetadata) {
- Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
+ private static Worker.WorkerMetadata toWorkerStageMetadata(WorkerMetadata
workerMetadata) {
Review Comment:
toProtoWorkerMetadata
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]