suryaprasanna commented on code in PR #5681: URL: https://github.com/apache/hudi/pull/5681#discussion_r921413669
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.table.manager; + +import org.apache.hudi.common.config.HoodieTableManagerConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieRemoteException; + +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.client.utils.URIBuilder; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Client which send the table service instants to the table management service. + */ +public class HoodieTableManagerClient { + + private static final String BASE_URL = "/v1/hoodie/serivce"; + + public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register"); + + public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit"); + public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove"); + + public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit"); + public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove"); + + public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit"); + public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove"); + + public static final String DATABASE_NAME_PARAM = "db_name"; + public static final String TABLE_NAME_PARAM = "table_name"; + public static final String BASEPATH_PARAM = "basepath"; + public static final String INSTANT_PARAM = "instant"; + public static final String USERNAME = "username"; + public static final String CLUSTER = "cluster"; + public static final String QUEUE = "queue"; + public static final String RESOURCE = "resource"; + public static final String PARALLELISM = "parallelism"; + public static final String EXTRA_PARAMS = "extra_params"; + public static final String EXECUTION_ENGINE = "execution_engine"; + + private final HoodieTableManagerConfig config; + private final HoodieTableMetaClient metaClient; + private final String host; + private final int port; + private final String basePath; + private final String dbName; + private final String tableName; + + private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class); + + public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) { + this.basePath = metaClient.getBasePathV2().toString(); + this.dbName = metaClient.getTableConfig().getDatabaseName(); + this.tableName = metaClient.getTableConfig().getTableName(); + this.host = config.getTableManagerHost(); + this.port = config.getTableManagerPort(); + this.config = config; + this.metaClient = metaClient; + } + + private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException { + URIBuilder builder = + new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http"); + queryParameters.forEach(builder::addParameter); + + String url = builder.toString(); + LOG.info("Sending request to table management service : (" + url + ")"); + Response response; + int timeout = this.config.getConnectionTimeout() * 1000; // msec + int requestRetryLimit = config.getConnectionRetryLimit(); + int retry = 0; + + while (retry < requestRetryLimit) { + try { + response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute(); + return response.returnContent().asString(); + } catch (IOException e) { + retry++; + LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e); + if (requestRetryLimit == retry) { + throw e; + } + } + + try { + TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay()); + } catch (InterruptedException e) { + // ignore + } + } + + throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit)); + } + + private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) { + Map<String, String> paramsMap = new HashMap<>(); + paramsMap.put(BASEPATH_PARAM, basePath); + ValidationUtils.checkArgument(paramNames.length == paramVals.length); + for (int i = 0; i < paramNames.length; i++) { + paramsMap.put(paramNames[i], paramVals[i]); + } + return paramsMap; + } + + public void register() { + try { + executeRequest(REGISTER_ENDPOINT, getDefaultParams(null)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void submitCompaction() { + try { + String instantRange = StringUtils.join(metaClient.reloadActiveTimeline() + .filterPendingCompactionTimeline() + .getInstants() + .map(HoodieInstant::getTimestamp) + .toArray(String[]::new), ","); + + executeRequest(SUBMIT_COMPACTION, getDefaultParams(instantRange)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void submitClean() { + try { + String instantRange = StringUtils.join(metaClient.reloadActiveTimeline() + .getCleanerTimeline() + .filterInflightsAndRequested() + .getInstants() + .map(HoodieInstant::getTimestamp) + .toArray(String[]::new), ","); + + executeRequest(SUBMIT_CLEAN, getDefaultParams(instantRange)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void submitClustering() { + try { + metaClient.reloadActiveTimeline(); + String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient) Review Comment: Here is my understanding, we have 3 components, scheduler(Hudi table via write API), storage (state manager or metaserver(can be mysql or hudi table)) and execution(Table services). > When the table service is unavailable, the timeline will not be able to generate the requested instant, so it will not be able to operate when the table service is restored, which has a greater impact on the day level write job. - If table services is down then there is no one to execute the plan, so no need to create requested file. Initially table generated the plan, similarly it will try to regenerate every time until the table services accepts the request. - There is an advantage to this approach, the latest plan that we generate is also going to include latest changes happened on the table during the period the table service is down. > When the table service receives the request, it needs to scan the timeline corresponding to the hudi table to determine whether it is invalid, which will make table management service appear a lot of additional operations. - I agree, Table service has to be as dumb as possible. Here, it's duty is to read from the storage and execute the spark job. So, basically we are starting the spark job, and the job will not do anything because it cannot find the .requested file for the instant. So, it will just quit. Another approach, I can think of is to have async thread running that starts on write path and incrementally post the requests to table service reading through the timeline. We can even update file's extrametadata map with any information if needed. I am inclined with your approach because it is a clean way to do it, but we need to address following things, 1. I have seen people configure clustering on non-partitioned tables. If we schedule clustering on them and fail to update the table services then no other clustering job will be able to schedule on them because we don't schedule clustering on pending file groups under clustering. So, the replacecommit.requested can be left alone. 2. Passing job specific parameters like executor count, either we need to pass this information while calling the table service APIs or the table service API need to read through the plan before scheduling a spark job. Ideally we do not want table services to read through clustering plans. 3. Also, we can schedule clustering jobs on a small group of filegroups one after the other and these clustering plans can be smaller in size and more in number. If table service is running slow, more and more .requested file will be created and more requests will be send to table service. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org