xingfudeshi commented on code in PR #6830: URL: https://github.com/apache/incubator-seata/pull/6830#discussion_r1799038446
########## config/seata-config-raft/src/main/java/org/apache/seata/config/raft/RaftConfigurationClient.java: ########## @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.config.raft; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.config.ConfigDataResponse; +import org.apache.seata.common.exception.AuthenticationFailedException; +import org.apache.seata.common.exception.ErrorCode; +import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.exception.RetryableException; +import org.apache.seata.common.exception.SeataRuntimeException; +import org.apache.seata.common.metadata.Metadata; +import org.apache.seata.common.metadata.MetadataResponse; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.common.util.HttpClientUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.AbstractConfiguration; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationChangeEvent; +import org.apache.seata.config.ConfigurationChangeListener; +import org.apache.seata.config.ConfigurationChangeType; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.config.dto.ConfigurationInfoDto; +import org.apache.seata.config.dto.ConfigurationItem; +import org.apache.seata.config.store.ConfigStoreManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.seata.common.ConfigurationKeys.CONFIG_STORE_DATA_ID; +import static org.apache.seata.common.ConfigurationKeys.CONFIG_STORE_NAMESPACE; +import static org.apache.seata.common.Constants.DEFAULT_STORE_DATA_ID; +import static org.apache.seata.common.Constants.DEFAULT_STORE_NAMESPACE; +import static org.apache.seata.common.Constants.RAFT_CONFIG_GROUP; +import static org.apache.seata.common.DefaultValues.DEFAULT_SEATA_GROUP; + +/** + * The type Raft configuration of client. + * + */ +public class RaftConfigurationClient extends AbstractConfiguration { + private static final Logger LOGGER = LoggerFactory.getLogger(RaftConfigurationClient.class); + private static final String CONFIG_TYPE = "raft"; + private static final String SERVER_ADDR_KEY = "serverAddr"; + private static final String RAFT_GROUP = RAFT_CONFIG_GROUP; + private static final String RAFT_CLUSTER = DEFAULT_SEATA_GROUP; + private static final String CONFIG_NAMESPACE; + private static final String CONFIG_DATA_ID; + private static final String USERNAME_KEY = "username"; + private static final String PASSWORD_KEY = "password"; + private static final String CONFIG_KEY = "config"; + private static final String VERSION_KEY = "version"; + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String TOKEN_VALID_TIME_MS_KEY = "tokenValidityInMilliseconds"; + private static volatile RaftConfigurationClient instance; + private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE; + private static final String USERNAME; + private static final String PASSWORD; + private static final long TOKEN_EXPIRE_TIME_IN_MILLISECONDS; + private static long tokenTimeStamp = -1; + private static final String IP_PORT_SPLIT_CHAR = ":"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Map<String, List<InetSocketAddress>> INIT_ADDRESSES = new HashMap<>(); + private static final Metadata METADATA = new Metadata(); + private static volatile ThreadPoolExecutor REFRESH_METADATA_EXECUTOR; + private static volatile ThreadPoolExecutor REFRESH_CONFIG_EXECUTOR; + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + private static final AtomicBoolean CONFIG_CLOSED = new AtomicBoolean(false); + private static volatile Properties seataConfig = new Properties(); + private static final AtomicLong CONFIG_VERSION = new AtomicLong(0); + private static final int MAP_INITIAL_CAPACITY = 8; + private static final ConcurrentMap<String, ConcurrentMap<ConfigurationChangeListener, ConfigStoreListener>> CONFIG_LISTENERS_MAP + = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); + + private static ConfigStoreListener CONFIG_LISTENER; + static { + USERNAME = FILE_CONFIG.getConfig(getRaftUsernameKey()); + PASSWORD = FILE_CONFIG.getConfig(getRaftPasswordKey()); + TOKEN_EXPIRE_TIME_IN_MILLISECONDS = FILE_CONFIG.getLong(getTokenExpireTimeInMillisecondsKey(), 29 * 60 * 1000L); + CONFIG_NAMESPACE = FILE_CONFIG.getConfig(CONFIG_STORE_NAMESPACE, DEFAULT_STORE_NAMESPACE); + CONFIG_DATA_ID = FILE_CONFIG.getConfig(CONFIG_STORE_DATA_ID, DEFAULT_STORE_DATA_ID); + } + public static String jwtToken; + public static RaftConfigurationClient getInstance() { + if (instance == null) { + synchronized (RaftConfigurationClient.class) { + if (instance == null) { + instance = new RaftConfigurationClient(); + } + } + } + return instance; + } + + private RaftConfigurationClient() { + initClusterMetaData(); + initClientConfig(); + } + + private static void initClientConfig() { + try { + Map<String, Object> configMap = acquireClusterConfigData(RAFT_CLUSTER, RAFT_GROUP, CONFIG_NAMESPACE, CONFIG_DATA_ID); + if (configMap != null) { + seataConfig.putAll(configMap); + } + CONFIG_LISTENER = new ConfigStoreListener(CONFIG_NAMESPACE, null); + startQueryConfigData(); + } catch (RetryableException e) { + LOGGER.error("init config properties error", e); + } + + } + private static String queryHttpAddress(String clusterName, String group) { + List<Node> nodeList = METADATA.getNodes(clusterName, group); + List<String> addressList = null; + Stream<InetSocketAddress> stream = null; + if (CollectionUtils.isNotEmpty(nodeList)) { + addressList = + nodeList.stream().map(node -> node.getControl().createAddress()).collect(Collectors.toList()); + } else { + stream = INIT_ADDRESSES.get(clusterName).stream(); + } + if (addressList != null) { + return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size())); + } else { + Map<String, Node> map = new HashMap<>(); + if (CollectionUtils.isNotEmpty(nodeList)) { + for (Node node : nodeList) { + map.put(new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()).getAddress().getHostAddress() + + IP_PORT_SPLIT_CHAR + node.getTransaction().getPort(), node); + } + } + addressList = stream.map(inetSocketAddress -> { + String host = inetSocketAddress.getAddress().getHostAddress(); + Node node = map.get(host + IP_PORT_SPLIT_CHAR + inetSocketAddress.getPort()); + return host + IP_PORT_SPLIT_CHAR + + (node != null ? node.getControl().getPort() : inetSocketAddress.getPort()); + }).collect(Collectors.toList()); + return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size())); + } + } + private static void acquireClusterMetaData(String clusterName, String group) throws RetryableException { + String tcAddress = queryHttpAddress(clusterName, group); + Map<String, String> header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); + if (isTokenExpired()) { + refreshToken(tcAddress); + } + if (StringUtils.isNotBlank(jwtToken)) { + header.put(AUTHORIZATION_HEADER, jwtToken); + } + if (StringUtils.isNotBlank(tcAddress)) { + Map<String, String> param = new HashMap<>(); + // param.put("group", group); + String response = null; + try (CloseableHttpResponse httpResponse = + HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/config/cluster", param, header, 1000)) { + if (httpResponse != null) { + if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8); + } else if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { + if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) { + throw new RetryableException("Authentication failed!"); + } else { + throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password."); + } + } + } + MetadataResponse metadataResponse; + if (StringUtils.isNotBlank(response)) { + try { + metadataResponse = OBJECT_MAPPER.readValue(response, MetadataResponse.class); + METADATA.refreshMetadata(clusterName, metadataResponse); + } catch (JsonProcessingException e) { + LOGGER.error(e.getMessage(), e); + } + } + } catch (IOException e) { + throw new RetryableException(e.getMessage(), e); + } + } + } + + @SuppressWarnings("unchecked") + private static Map<String, Object> acquireClusterConfigData(String clusterName, String group, String configNamespace, String configDataId) throws RetryableException { + String tcAddress = queryHttpAddress(clusterName, group); + Map<String, String> header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); + if (isTokenExpired()) { + refreshToken(tcAddress); + } + if (StringUtils.isNotBlank(jwtToken)) { + header.put(AUTHORIZATION_HEADER, jwtToken); + } + if (StringUtils.isNotBlank(tcAddress)) { + Map<String, String> param = new HashMap<>(); + param.put("namespace", configNamespace); + param.put("dataId", configDataId); + String response = null; + try (CloseableHttpResponse httpResponse = + HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/config/getAll", param, header, 1000)) { + if (httpResponse != null) { + if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8); + } else if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) { + if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) { + throw new RetryableException("Authentication failed!"); + } else { + throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password."); + } + } + } + + ConfigDataResponse<ConfigurationInfoDto> configDataResponse; + if (StringUtils.isNotBlank(response)) { + try { + configDataResponse = OBJECT_MAPPER.readValue(response, new TypeReference<ConfigDataResponse<ConfigurationInfoDto>>() { + }); + if (configDataResponse.getSuccess()) { + ConfigurationInfoDto configurationInfoDto = configDataResponse.getResult(); + Map<String, ConfigurationItem> configItemMap = configurationInfoDto.getConfig(); + Map<String, Object> configMap = configItemMap.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().getValue())); + Long version = configurationInfoDto.getVersion() == null ? -1 : configurationInfoDto.getVersion(); + Long currentVersion = CONFIG_VERSION.get(); + if (version < currentVersion) { + LOGGER.info("The configuration version: {} of the server is lower than the current configuration: {} , it may be expired configuration.", version, CONFIG_VERSION.get()); + throw new RetryableException("Expired configuration!"); + } else { + CONFIG_VERSION.set(version); + return configMap; + } + } else { + throw new RetryableException(configDataResponse.getErrMsg()); + } + } catch (JsonProcessingException e) { + LOGGER.error(e.getMessage(), e); + } + } + } catch (IOException e) { + throw new RetryableException(e.getMessage(), e); + } + } + return null; + } + + protected static void startQueryMetadata() { + if (REFRESH_METADATA_EXECUTOR == null) { + synchronized (INIT_ADDRESSES) { + if (REFRESH_METADATA_EXECUTOR == null) { + REFRESH_METADATA_EXECUTOR = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new NamedThreadFactory("refreshMetadata", 1, true)); + REFRESH_METADATA_EXECUTOR.execute(() -> { + long metadataMaxAgeMs = FILE_CONFIG.getLong(ConfigurationKeys.CLIENT_METADATA_MAX_AGE_MS, 30000L); + long currentTime = System.currentTimeMillis(); + while (!CLOSED.get()) { + try { + // Forced refresh of metadata information after set age + boolean fetch = System.currentTimeMillis() - currentTime > metadataMaxAgeMs; + String clusterName = RAFT_CLUSTER; + if (!fetch) { + fetch = watch(); + } + // Cluster changes or reaches timeout refresh time + if (fetch) { + for (String group : METADATA.groups(clusterName)) { + try { + acquireClusterMetaData(clusterName, group); + } catch (Exception e) { + // prevents an exception from being thrown that causes the thread to break + if (e instanceof RetryableException) { + throw e; + } else { + LOGGER.error("failed to get the leader address,error: {}", e.getMessage()); + } + } + } + currentTime = System.currentTimeMillis(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("refresh seata cluster metadata time: {}", currentTime); + } + } + } catch (RetryableException e) { + LOGGER.error(e.getMessage(), e); + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } + } + }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + CLOSED.compareAndSet(false, true); + // is there should be shutdown gracefully? + REFRESH_METADATA_EXECUTOR.shutdown(); + })); + } + } + } + } + + protected static void startQueryConfigData() { + if (REFRESH_CONFIG_EXECUTOR == null) { + synchronized (RaftConfigurationClient.class) { + if (REFRESH_CONFIG_EXECUTOR == null) { + REFRESH_CONFIG_EXECUTOR = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new NamedThreadFactory("refreshConfig", 1, true)); + REFRESH_CONFIG_EXECUTOR.execute(() -> { + long metadataMaxAgeMs = FILE_CONFIG.getLong(ConfigurationKeys.CLIENT_METADATA_MAX_AGE_MS, 30000L); + long currentTime = System.currentTimeMillis(); + while (!CONFIG_CLOSED.get()) { + try { + // Forced refresh of metadata information after set age + boolean fetch = System.currentTimeMillis() - currentTime > metadataMaxAgeMs; + if (!fetch) { + fetch = configWatch(); + } + // Cluster config changes or reaches timeout refresh time + if (fetch) { + try { + Map<String, Object> configMap = acquireClusterConfigData(RAFT_CLUSTER, RAFT_GROUP, CONFIG_NAMESPACE, CONFIG_DATA_ID); + if (CollectionUtils.isNotEmpty(configMap)) { + notifyConfigMayChange(configMap); + } + } catch (Exception e) { + // prevents an exception from being thrown that causes the thread to break + if (e instanceof RetryableException) { + throw e; + } else { + LOGGER.error("failed to get the config ,error: {}", e.getMessage()); Review Comment: Use LOGGER.error("failed to get the config ,error: {}", e.getMessage(),e); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org