kfaraz commented on code in PR #18692: URL: https://github.com/apache/druid/pull/18692#discussion_r2471709080
########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemPropertiesTableTest.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Map; + +public class SystemPropertiesTableTest extends EmbeddedClusterTestBase +{ + private static final String HOST = "localhost"; + private static final String BROKER_PORT = "8082"; + private static final String OVERLORD_PORT = "8090"; + private static final String BROKER_SERVICE = "test/broker"; + private static final String OVERLORD_SERVICE = "test/overlord"; + + private final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("test.onlyBroker", "brokerValue") + .addProperty("mytestbrokerproperty", "mytestbrokervalue") + .addProperty("druid.host", HOST) + .addProperty("druid.plaintextPort", BROKER_PORT) Review Comment: Maybe override this to a value which is not already the default port for Broker (8082). ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemPropertiesTableTest.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Map; + +public class SystemPropertiesTableTest extends EmbeddedClusterTestBase +{ + private static final String HOST = "localhost"; + private static final String BROKER_PORT = "8082"; + private static final String OVERLORD_PORT = "8090"; + private static final String BROKER_SERVICE = "test/broker"; + private static final String OVERLORD_SERVICE = "test/overlord"; + + private final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("test.onlyBroker", "brokerValue") + .addProperty("mytestbrokerproperty", "mytestbrokervalue") + .addProperty("druid.host", HOST) + .addProperty("druid.plaintextPort", BROKER_PORT) + .addProperty("druid.service", BROKER_SERVICE); + + private final EmbeddedOverlord overlord = new EmbeddedOverlord() + .addProperty("druid.service", OVERLORD_SERVICE) + .addProperty("druid.host", HOST) Review Comment: I don't think there is much point in trying to override the host since it always needs to be `localhost` for the test to work. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemPropertiesTableTest.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Map; + +public class SystemPropertiesTableTest extends EmbeddedClusterTestBase +{ + private static final String HOST = "localhost"; + private static final String BROKER_PORT = "8082"; + private static final String OVERLORD_PORT = "8090"; + private static final String BROKER_SERVICE = "test/broker"; + private static final String OVERLORD_SERVICE = "test/overlord"; + + private final EmbeddedBroker broker = new EmbeddedBroker() + .addProperty("test.onlyBroker", "brokerValue") + .addProperty("mytestbrokerproperty", "mytestbrokervalue") + .addProperty("druid.host", HOST) + .addProperty("druid.plaintextPort", BROKER_PORT) + .addProperty("druid.service", BROKER_SERVICE); + + private final EmbeddedOverlord overlord = new EmbeddedOverlord() + .addProperty("druid.service", OVERLORD_SERVICE) + .addProperty("druid.host", HOST) + .addProperty("druid.plaintextPort", OVERLORD_PORT) + .addProperty("test.onlyOverlord", "overlordValue"); + + + @Override + protected EmbeddedDruidCluster createCluster() + { + return EmbeddedDruidCluster + .withZookeeper() + .addServer(new EmbeddedCoordinator()) + .addServer(overlord) + .addServer(broker) + .addCommonProperty("commonProperty", "commonValue"); + } + + @Test + public void test_serverPropertiesTable() + { + final Map<String, String> overlordProps = cluster.callApi().serviceClient().onLeaderOverlord( + mapper -> new RequestBuilder(HttpMethod.GET, "/status/properties"), + new TypeReference<>(){} + ); + verifyPropertiesForServer(overlordProps, OVERLORD_SERVICE, getHostAndPort(HOST, OVERLORD_PORT), NodeRole.OVERLORD_JSON_NAME); + + final Map<String, String> brokerProps = cluster.callApi().serviceClient().onAnyBroker( + mapper -> new RequestBuilder(HttpMethod.GET, "/status/properties"), + new TypeReference<>(){} + ); + verifyPropertiesForServer(brokerProps, BROKER_SERVICE, getHostAndPort(HOST, BROKER_PORT), NodeRole.BROKER_JSON_NAME); + final String test = cluster.runSql("SELECT * FROM sys.server_properties"); Review Comment: Did you mean to verify this result? ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java: ########## @@ -234,28 +236,31 @@ public SystemSchema( final CoordinatorClient coordinatorClient, final OverlordClient overlordClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final @EscalatedClient HttpClient httpClient ) { Preconditions.checkNotNull(serverView, "serverView"); this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, - new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper), - SERVERS_TABLE, - new ServersTable( - druidNodeDiscoveryProvider, - serverInventoryView, - authorizerMapper, - overlordClient, - coordinatorClient, - jsonMapper - ), - SERVER_SEGMENTS_TABLE, - new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, - new TasksTable(overlordClient, authorizerMapper), - SUPERVISOR_TABLE, - new SupervisorsTable(overlordClient, authorizerMapper) Review Comment: Nit: Please revert the formatting change here. ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature + .builder() + .add("service_name", ColumnType.STRING) + .add("server", ColumnType.STRING) + .add("node_roles", ColumnType.STRING) + .add("property", ColumnType.STRING) + .add("value", ColumnType.STRING) + .build(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final AuthorizerMapper authorizerMapper; + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + + public SystemPropertiesTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper, + HttpClient httpClient, + ObjectMapper jsonMapper + ) + { + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.authorizerMapper = authorizerMapper; + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(PROPERTIES_SIGNATURE, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable<Object[]> scan(DataContext root) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator<DiscoveryDruidNode> druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); + + final Map<String, Pair<StringBuilder, Stream<Object[]>>> serverToPropertiesMap = new HashMap<>(); Review Comment: Rather than a Pair, we can create a private class with some appropriate methods. Also, let's avoid the `StringBuilder` and just use a list. We can serialize it to a String when converting to the `Linq4.asEnumerable()`. ```java private class ServerProperties { final String serviceName; final String hostAndPortToUse; final List<String> nodesRoles = new ArrayList<>(); final Map<String, String> properties; } ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java: ########## @@ -234,28 +236,31 @@ public SystemSchema( final CoordinatorClient coordinatorClient, final OverlordClient overlordClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final @EscalatedClient HttpClient httpClient Review Comment: Nit: to adhere to the Druid style used elsewhere ```suggestion @EscalatedClient final HttpClient httpClient ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. Review Comment: ```suggestion * System schema table {@code sys.server_properties} that contains the properties of all Druid servers. * Each row contains the value of a single property. If a server has multiple node roles, all the rows for * that server would have multiple values in the column {@code node_roles} rather than duplicating all the * rows. ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; Review Comment: ```suggestion public static final String TABLE_NAME = "server_properties"; ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature Review Comment: ```suggestion static final RowSignature ROW_SIGNATURE = RowSignature ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature + .builder() + .add("service_name", ColumnType.STRING) + .add("server", ColumnType.STRING) + .add("node_roles", ColumnType.STRING) + .add("property", ColumnType.STRING) + .add("value", ColumnType.STRING) + .build(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final AuthorizerMapper authorizerMapper; + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + + public SystemPropertiesTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper, + HttpClient httpClient, + ObjectMapper jsonMapper + ) + { + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.authorizerMapper = authorizerMapper; + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(PROPERTIES_SIGNATURE, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable<Object[]> scan(DataContext root) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator<DiscoveryDruidNode> druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); + + final Map<String, Pair<StringBuilder, Stream<Object[]>>> serverToPropertiesMap = new HashMap<>(); + druidServers.forEachRemaining(discoveryDruidNode -> { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + final Map<String, String> propertiesMap = getProperties(druidNode); + if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) { + Pair<StringBuilder, Stream<Object[]>> pair = serverToPropertiesMap.get(druidNode.getHostAndPortToUse()); + pair.lhs.append(StringUtils.format(",%s", discoveryDruidNode.getNodeRole().getJsonName())); + } else { + final StringBuilder builder = new StringBuilder(); + builder.append(StringUtils.format("%s", discoveryDruidNode.getNodeRole().getJsonName())); + serverToPropertiesMap.put(druidNode.getHostAndPortToUse(), Pair.of(builder, propertiesMap.entrySet().stream() + .map(entry -> new Object[]{ + druidNode.getServiceName(), + druidNode.getHostAndPortToUse(), + entry.getKey(), + entry.getValue() + }) + ) + ); + } + }); + return Linq4j.asEnumerable(serverToPropertiesMap.values().stream().flatMap(pair -> pair.rhs.map(entry -> new Object[]{entry[0], entry[1], pair.lhs.toString(), entry[2], entry[3]})).collect(Collectors.toList())); + } + + private Map<String, String> getProperties(DruidNode druidNode) + { + final String url = druidNode.getUriToUse().resolve("/status/properties").toString(); + try { + final Request request = new Request(HttpMethod.GET, new URL(url)); + final StringFullResponseHolder response; + response = httpClient + .go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)) + .get(); + + if (response.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to get properties from node at [%s]. Error code [%d], description [%s].", Review Comment: ```suggestion "Failed to get properties from node[%s]. Error code[%d], description[%s].", ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature + .builder() + .add("service_name", ColumnType.STRING) + .add("server", ColumnType.STRING) + .add("node_roles", ColumnType.STRING) + .add("property", ColumnType.STRING) + .add("value", ColumnType.STRING) + .build(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final AuthorizerMapper authorizerMapper; + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + + public SystemPropertiesTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper, + HttpClient httpClient, + ObjectMapper jsonMapper + ) + { + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.authorizerMapper = authorizerMapper; + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(PROPERTIES_SIGNATURE, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable<Object[]> scan(DataContext root) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator<DiscoveryDruidNode> druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); + + final Map<String, Pair<StringBuilder, Stream<Object[]>>> serverToPropertiesMap = new HashMap<>(); + druidServers.forEachRemaining(discoveryDruidNode -> { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + final Map<String, String> propertiesMap = getProperties(druidNode); + if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) { + Pair<StringBuilder, Stream<Object[]>> pair = serverToPropertiesMap.get(druidNode.getHostAndPortToUse()); + pair.lhs.append(StringUtils.format(",%s", discoveryDruidNode.getNodeRole().getJsonName())); + } else { + final StringBuilder builder = new StringBuilder(); + builder.append(StringUtils.format("%s", discoveryDruidNode.getNodeRole().getJsonName())); + serverToPropertiesMap.put(druidNode.getHostAndPortToUse(), Pair.of(builder, propertiesMap.entrySet().stream() + .map(entry -> new Object[]{ + druidNode.getServiceName(), + druidNode.getHostAndPortToUse(), + entry.getKey(), + entry.getValue() + }) + ) + ); + } + }); + return Linq4j.asEnumerable(serverToPropertiesMap.values().stream().flatMap(pair -> pair.rhs.map(entry -> new Object[]{entry[0], entry[1], pair.lhs.toString(), entry[2], entry[3]})).collect(Collectors.toList())); Review Comment: Please check if this can be formatted better by breaking up the argument of `asEnumerable` into multiple lines. ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature + .builder() + .add("service_name", ColumnType.STRING) + .add("server", ColumnType.STRING) + .add("node_roles", ColumnType.STRING) + .add("property", ColumnType.STRING) + .add("value", ColumnType.STRING) + .build(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final AuthorizerMapper authorizerMapper; + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + + public SystemPropertiesTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper, + HttpClient httpClient, + ObjectMapper jsonMapper + ) + { + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.authorizerMapper = authorizerMapper; + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(PROPERTIES_SIGNATURE, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable<Object[]> scan(DataContext root) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator<DiscoveryDruidNode> druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); + + final Map<String, Pair<StringBuilder, Stream<Object[]>>> serverToPropertiesMap = new HashMap<>(); + druidServers.forEachRemaining(discoveryDruidNode -> { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + final Map<String, String> propertiesMap = getProperties(druidNode); + if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) { + Pair<StringBuilder, Stream<Object[]>> pair = serverToPropertiesMap.get(druidNode.getHostAndPortToUse()); + pair.lhs.append(StringUtils.format(",%s", discoveryDruidNode.getNodeRole().getJsonName())); + } else { + final StringBuilder builder = new StringBuilder(); + builder.append(StringUtils.format("%s", discoveryDruidNode.getNodeRole().getJsonName())); + serverToPropertiesMap.put(druidNode.getHostAndPortToUse(), Pair.of(builder, propertiesMap.entrySet().stream() + .map(entry -> new Object[]{ + druidNode.getServiceName(), + druidNode.getHostAndPortToUse(), + entry.getKey(), + entry.getValue() + }) + ) + ); + } + }); + return Linq4j.asEnumerable(serverToPropertiesMap.values().stream().flatMap(pair -> pair.rhs.map(entry -> new Object[]{entry[0], entry[1], pair.lhs.toString(), entry[2], entry[3]})).collect(Collectors.toList())); + } + + private Map<String, String> getProperties(DruidNode druidNode) + { + final String url = druidNode.getUriToUse().resolve("/status/properties").toString(); + try { + final Request request = new Request(HttpMethod.GET, new URL(url)); + final StringFullResponseHolder response; + response = httpClient + .go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)) + .get(); + + if (response.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to get properties from node at [%s]. Error code [%d], description [%s].", + url, + response.getStatus().getCode(), + response.getStatus().getReasonPhrase() + ); + } + return jsonMapper.readValue( + response.getContent(), + new TypeReference<Map<String, String>>() + { + } Review Comment: ```suggestion new TypeReference<>(){} ``` ########## sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemPropertiesTable.java: ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This table contains row per property. It contains all the properties of all druid servers. + */ +public class SystemPropertiesTable extends AbstractTable implements ScannableTable +{ + public static final String PROPERTIES_TABLE = "server_properties"; + + static final RowSignature PROPERTIES_SIGNATURE = RowSignature + .builder() + .add("service_name", ColumnType.STRING) + .add("server", ColumnType.STRING) + .add("node_roles", ColumnType.STRING) + .add("property", ColumnType.STRING) + .add("value", ColumnType.STRING) + .build(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final AuthorizerMapper authorizerMapper; + private final HttpClient httpClient; + private final ObjectMapper jsonMapper; + + public SystemPropertiesTable( + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + AuthorizerMapper authorizerMapper, + HttpClient httpClient, + ObjectMapper jsonMapper + ) + { + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; + this.authorizerMapper = authorizerMapper; + this.httpClient = httpClient; + this.jsonMapper = jsonMapper; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(PROPERTIES_SIGNATURE, typeFactory); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable<Object[]> scan(DataContext root) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + SystemSchema.checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final Iterator<DiscoveryDruidNode> druidServers = SystemSchema.getDruidServers(druidNodeDiscoveryProvider); + + final Map<String, Pair<StringBuilder, Stream<Object[]>>> serverToPropertiesMap = new HashMap<>(); + druidServers.forEachRemaining(discoveryDruidNode -> { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + final Map<String, String> propertiesMap = getProperties(druidNode); + if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) { + Pair<StringBuilder, Stream<Object[]>> pair = serverToPropertiesMap.get(druidNode.getHostAndPortToUse()); + pair.lhs.append(StringUtils.format(",%s", discoveryDruidNode.getNodeRole().getJsonName())); + } else { + final StringBuilder builder = new StringBuilder(); + builder.append(StringUtils.format("%s", discoveryDruidNode.getNodeRole().getJsonName())); + serverToPropertiesMap.put(druidNode.getHostAndPortToUse(), Pair.of(builder, propertiesMap.entrySet().stream() + .map(entry -> new Object[]{ + druidNode.getServiceName(), + druidNode.getHostAndPortToUse(), + entry.getKey(), + entry.getValue() + }) + ) + ); + } + }); + return Linq4j.asEnumerable(serverToPropertiesMap.values().stream().flatMap(pair -> pair.rhs.map(entry -> new Object[]{entry[0], entry[1], pair.lhs.toString(), entry[2], entry[3]})).collect(Collectors.toList())); + } + + private Map<String, String> getProperties(DruidNode druidNode) + { + final String url = druidNode.getUriToUse().resolve("/status/properties").toString(); + try { + final Request request = new Request(HttpMethod.GET, new URL(url)); + final StringFullResponseHolder response; + response = httpClient + .go(request, new StringFullResponseHandler(StandardCharsets.UTF_8)) + .get(); + + if (response.getStatus().getCode() != HttpServletResponse.SC_OK) { + throw new RE( + "Failed to get properties from node at [%s]. Error code [%d], description [%s].", + url, + response.getStatus().getCode(), + response.getStatus().getReasonPhrase() + ); + } + return jsonMapper.readValue( + response.getContent(), + new TypeReference<Map<String, String>>() + { + } + ); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { Review Comment: Let's club these to just catch `Exception` and throw a `DruidException` since this will be sent back to the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
