vinothchandar commented on code in PR #5681:
URL: https://github.com/apache/hudi/pull/5681#discussion_r924535825


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java:
##########
@@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig 
config) {
     }
     return enabled;
   }
+
+  default boolean delegateToTableManagerService(HoodieWriteConfig config, 
ActionType actionType) {
+    boolean supportAction = config.isTableManagerSupportAction(actionType);

Review Comment:
   rename: supports Action



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java:
##########
@@ -102,6 +104,16 @@ public Option<HoodieClusteringPlan> execute() {
         throw new HoodieIOException("Exception scheduling clustering", ioe);
       }
     }
+
+    if (config.isTableManagerSupportAction(ActionType.replacecommit)) {
+      createClusteringToService();

Review Comment:
   rename all such createXXX to delegateXXX to clearly capture what's going on. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;

Review Comment:
   Can we just align this with how the timeline server is built i.e via javalin 
instead of the raw HTTP client



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java:
##########
@@ -101,9 +104,14 @@ public Option<HoodieCompactionPlan> execute() {
       } catch (IOException ioe) {
         throw new HoodieIOException("Exception scheduling compaction", ioe);
       }
-      return Option.of(plan);
+      option = Option.of(plan);
     }
-    return Option.empty();
+
+    if (config.isTableManagerSupportAction(ActionType.compaction)) {

Review Comment:
   @prasannarajaperumal can you think any way where can avoid this if switch 
here. the `ActionExecutor` hierarchy is pretty deep. So wonder if there is a 
better way to do without writing a bunch of scaffolding/small subclasses and 
such/



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -152,9 +155,19 @@ protected Option<HoodieCleanerPlan> requestClean(String 
startCleanTime) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    if (config.isTableManagerSupportAction(ActionType.clean)) {
+      createCleanToService();

Review Comment:
   rename: delegateCleanExecutionToTableManager() ? In general, lets strive for 
consistent naming of similar concepts across the PR. e.g pick of of the 
following - `TableManagerService`, `TableManager`, `Service` (probably not 
ideal since we dont know what "service" is) 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Table Management Service.
+ */
+@Immutable
+@ConfigClassProperty(name = "Table Management Service Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations used by the Hudi Table Management Service.")
+public class HoodieTableManagerConfig extends HoodieConfig {
+
+  public static final String TABLE_MANAGEMENT_SERVICE_PREFIX = 
"hoodie.table.management.service";

Review Comment:
   same feedback. Consistent naming - table.manager.service vs 
table.management.service. IMHO we can just keep it short as `table.manager` 
(its kind of conceivable manager is a service)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java:
##########
@@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig 
config) {
     }
     return enabled;
   }
+
+  default boolean delegateToTableManagerService(HoodieWriteConfig config, 
ActionType actionType) {
+    boolean supportAction = config.isTableManagerSupportAction(actionType);

Review Comment:
   rename: is Table Manager Supported Action?
   
   Also can this check move out of WriteConfig? We want to keep methods there 
pretty light - Small validations on data type conversions



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java:
##########
@@ -51,7 +51,7 @@ public HoodieSparkClusteringClient(
   public void cluster(HoodieInstant instant) throws IOException {
     LOG.info("Executing clustering instance " + instant);
     SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) 
clusteringClient;
-    Option<HoodieCommitMetadata> commitMetadata = 
writeClient.cluster(instant.getTimestamp(), true).getCommitMetadata();
+    Option<HoodieCommitMetadata> commitMetadata = 
writeClient.cluster(instant.getTimestamp()).getCommitMetadata();

Review Comment:
   who calls this today? Are they are side effects from not forcing the 
completion?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -257,6 +257,7 @@ public static HoodieTableMetaClient 
initTableIfNotExists(Configuration conf) thr
           
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
           .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
           .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
+          .setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME))

Review Comment:
   side effects?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management 
service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", 
BASE_URL, "register");
+
+  public static final String CREATE_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/create");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/delete");
+
+  public static final String CREATE_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/create");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/delete");
+
+  public static final String CREATE_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/create");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/delete");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, 
HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> 
queryParameters) throws IOException {
+    URIBuilder builder =
+        new 
URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = 
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d 
times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management 
service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] 
paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void register() {
+    try {
+      executeRequest(REGISTER_ENDPOINT, getDefaultParams(null));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void createCompaction() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstants()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(CREATE_COMPACTION, getDefaultParams(instantRange));

Review Comment:
   the naming of these endpoints can be more intuitive IMO. 
   Instead of CREATE_XXX , can we call them EXECUTE_XXX where xxx is the action?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, 
"register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieTableManagerClient.class);

Review Comment:
   lets actually fix this in this PR please, now that log4j2 is landed. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -581,6 +580,15 @@ protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata me
     }
   }
 
+  private void inlineCompaction(HoodieTable table, Option<Map<String, String>> 
extraMetadata) {

Review Comment:
   Reading this again, a separate client impl is probably cleaner even for this 
PR. These if checks will grow harder to maintain over time. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management 
service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", 
BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", 
BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", 
BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, 
"clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, 
HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> 
queryParameters) throws IOException {
+    URIBuilder builder =
+        new 
URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = 
Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d 
times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management 
service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] 
paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void register() {
+    try {
+      executeRequest(REGISTER_ENDPOINT, getDefaultParams(null));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitCompaction() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstants()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(SUBMIT_COMPACTION, getDefaultParams(instantRange));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitClean() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .getCleanerTimeline()
+          .filterInflightsAndRequested()
+          .getInstants()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(SUBMIT_CLEAN, getDefaultParams(instantRange));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitClustering() {
+    try {
+      metaClient.reloadActiveTimeline();
+      String instantRange = 
StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient)

Review Comment:
   +1 on keeping the table manager/service decouple as discussed. 
   1) Wont' 1 be handled automatically because we resend all the pending 
instants again to the table manager. 
   
   2) I would imagine 2 to be a config specified at the table manager service 
level and not passed through from the writer/
   
   3) there is no backpressure mechanism like what you are describing today. I 
think your point exists in async clustering/compaction today, regardless of 
table service or not?
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management 
service.
+ */
+public class HoodieTableManagerClient {

Review Comment:
   We already use javalin and the rest end points generated/wired that way. Can 
we just reuse that. it will save all the new bundling work needed for a new 
dependency. @prasannarajaperumal wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to