This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1bfac499b5 [INLONG-10173][Sort] SortStandalone support request unified
configuration (#10174)
1bfac499b5 is described below
commit 1bfac499b538032aaad64f1f7936bac75b9fd893
Author: vernedeng <[email protected]>
AuthorDate: Sat May 11 21:54:59 2024 +0800
[INLONG-10173][Sort] SortStandalone support request unified configuration
(#10174)
---
.../apache/inlong/common/pojo/sort/SortConfig.java | 8 +-
.../SortConfigResponse.java | 2 +-
.../pojo/sortstandalone/SortClusterConfig.java | 1 +
.../pojo/sortstandalone/SortClusterResponse.java | 1 +
.../common/pojo/sortstandalone/SortTaskConfig.java | 1 +
.../inlong/manager/service/core/SortService.java | 2 +-
.../manager/service/core/impl/SortServiceImpl.java | 4 +-
.../web/controller/openapi/SortController.java | 2 +-
.../config/holder/SortClusterConfigHolder.java | 1 +
.../SortConfigHolder.java} | 96 ++++++----------
.../ClassResourceSortClusterConfigLoader.java | 1 +
.../loader/ManagerSortClusterConfigLoader.java | 1 +
.../config/loader/SortClusterConfigLoader.java | 1 +
.../ClassResourceSortClusterConfigLoader.java | 35 ++----
.../{ => v2}/ManagerSortClusterConfigLoader.java | 52 +++------
.../config/loader/v2/SortConfigLoader.java | 13 +--
.../standalone/utils/FlumeConfigGenerator.java | 1 +
.../utils/{ => v2}/FlumeConfigGenerator.java | 82 +++-----------
.../apache/inlong/sort/standalone/SortCluster.java | 1 +
.../apache/inlong/sort/standalone/SortTask.java | 1 +
.../sort/standalone/{ => v2}/SortCluster.java | 53 +++------
.../inlong/sort/standalone/{ => v2}/SortTask.java | 125 +++++++--------------
22 files changed, 160 insertions(+), 324 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index ad3a6bb892..680827843d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -17,14 +17,20 @@
package org.apache.inlong.common.pojo.sort;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
public class SortConfig implements Serializable {
private String sortClusterName;
- private List<SortTaskConfig> clusters;
+ private List<SortTaskConfig> tasks;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
similarity index 95%
rename from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
index ea0a230e57..327898ede0 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.sortstandalone;
+package org.apache.inlong.common.pojo.sort;
import lombok.AllArgsConstructor;
import lombok.Builder;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
index 905cd61ae3..957eda406b 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
@@ -28,6 +28,7 @@ import java.util.List;
@Builder
@NoArgsConstructor
@AllArgsConstructor
+@Deprecated
public class SortClusterConfig {
String clusterName;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
index 679031e16f..c0bcfc88d0 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
@@ -26,6 +26,7 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
+@Deprecated
public class SortClusterResponse {
public static final int SUCC = 0;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
index 20fa5f06e0..ae165eb214 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
@Builder
@NoArgsConstructor
@AllArgsConstructor
+@Deprecated
public class SortTaskConfig {
String name;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 011c903c2e..be7fcb032c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -18,8 +18,8 @@
package org.apache.inlong.manager.service.core;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 8a9c9691a3..d820e37129 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sort.SortClusterConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.plugin.Plugin;
@@ -305,7 +305,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
}
map.add(sortTaskConfig);
}
- sortConfig.setClusters(temp.get(sortClusterName));
+ sortConfig.setTasks(temp.get(sortClusterName));
try {
String configStr = objectMapper.writeValueAsString(sortConfig);
sortConfigs.put(sortClusterName, configStr);
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index 2c9d994d1c..a8d5faad54 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -18,8 +18,8 @@
package org.apache.inlong.manager.web.controller.openapi;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
import org.apache.inlong.manager.service.core.SortService;
import io.swagger.annotations.Api;
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index eeec29863e..9d82bde48e 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -39,6 +39,7 @@ import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
*
* SortClusterConfigHolder
*/
+@Deprecated
public final class SortClusterConfigHolder {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
similarity index 60%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index eeec29863e..ef9106cc97 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.holder;
+package org.apache.inlong.sort.standalone.config.holder.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import
org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
-import
org.apache.inlong.sort.standalone.config.loader.ManagerSortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
+import
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
-import org.slf4j.Logger;
import java.util.Date;
import java.util.Timer;
@@ -35,39 +36,31 @@ import java.util.TimerTask;
import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
-/**
- *
- * SortClusterConfigHolder
- */
-public final class SortClusterConfigHolder {
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
+@Slf4j
+public class SortConfigHolder {
- private static SortClusterConfigHolder instance;
+ private static SortConfigHolder instance;
private long reloadInterval;
private Timer reloadTimer;
- private SortClusterConfigLoader loader;
- private SortClusterConfig config;
+ private SortConfigLoader loader;
+ private SortConfig config;
- /**
- * Constructor
- */
- private SortClusterConfigHolder() {
+ private SortConfigHolder() {
}
- /**
- * getInstance
- *
- * @return
- */
- private static SortClusterConfigHolder get() {
+ private static SortConfigHolder get() {
if (instance != null) {
return instance;
}
- synchronized (SortClusterConfigHolder.class) {
- instance = new SortClusterConfigHolder();
+
+ synchronized (SortConfigHolder.class) {
+ if (instance != null) {
+ return instance;
+ }
+
+ instance = new SortConfigHolder();
instance.reloadInterval =
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
String loaderType = CommonPropertiesHolder
.getString(SortClusterConfigType.KEY_TYPE,
SortClusterConfigType.MANAGER.name());
@@ -81,11 +74,11 @@ public final class SortClusterConfigHolder {
try {
Class<?> loaderClass = ClassUtils.getClass(loaderType);
Object loaderObject =
loaderClass.getDeclaredConstructor().newInstance();
- if (loaderObject instanceof SortClusterConfigLoader) {
- instance.loader = (SortClusterConfigLoader)
loaderObject;
+ if (loaderObject instanceof SortConfigLoader) {
+ instance.loader = (SortConfigLoader) loaderObject;
}
} catch (Throwable t) {
- LOG.error("Fail to init loader,loaderType:{},error:{}",
loaderType, t.getMessage());
+ log.error("failed to init loader, loaderType={}",
loaderType);
}
}
if (instance.loader == null) {
@@ -96,22 +89,17 @@ public final class SortClusterConfigHolder {
instance.reload();
instance.setReloadTimer();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ log.error("failed to reload instance", e);
}
}
return instance;
+
}
- /**
- * setReloadTimer
- */
private void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {
- /**
- * run
- */
public void run() {
reload();
}
@@ -119,40 +107,26 @@ public final class SortClusterConfigHolder {
reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
}
- /**
- * reload
- */
private void reload() {
try {
- SortClusterConfig newConfig = this.loader.load();
+ SortConfig newConfig = this.loader.load();
if (newConfig != null) {
this.config = newConfig;
}
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ log.error("failed to reload sort config", e);
}
}
- /**
- * getClusterConfig
- *
- * @return
- */
- public static SortClusterConfig getClusterConfig() {
+ public static SortConfig getSortConfig() {
return get().config;
}
- /**
- * getTaskConfig
- *
- * @param sortTaskName
- * @return
- */
public static SortTaskConfig getTaskConfig(String sortTaskName) {
- SortClusterConfig config = get().config;
- if (config != null && config.getSortTasks() != null) {
- for (SortTaskConfig task : config.getSortTasks()) {
- if (StringUtils.equals(sortTaskName, task.getName())) {
+ SortConfig config = get().config;
+ if (config != null && config.getTasks() != null) {
+ for (SortTaskConfig task : config.getTasks()) {
+ if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {
return task;
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
index 294fdf2cbf..8fb4a6f8ab 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -33,6 +33,7 @@ import java.nio.charset.Charset;
*
* ClassResourceCommonPropertiesLoader
*/
+@Deprecated
public class ClassResourceSortClusterConfigLoader implements
SortClusterConfigLoader {
public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index 83724b32d9..d72061353f 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
*
* ManagerSortClusterConfigLoader
*/
+@Deprecated
public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader
{
public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
index 8751d8f732..4405a4a0db 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -25,6 +25,7 @@ import org.apache.flume.conf.Configurable;
*
* SortClusterConfigLoader
*/
+@Deprecated
public interface SortClusterConfigLoader extends Configurable {
/**
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
similarity index 69%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
index 294fdf2cbf..629d8fef54 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.loader.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -26,26 +26,16 @@ import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
import org.slf4j.Logger;
-import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
-/**
- *
- * ClassResourceCommonPropertiesLoader
- */
-public class ClassResourceSortClusterConfigLoader implements
SortClusterConfigLoader {
+public class ClassResourceSortClusterConfigLoader implements SortConfigLoader {
public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
-
private Context context;
+ private ObjectMapper objectMapper = new ObjectMapper();
- /**
- * load
- *
- * @return
- */
@Override
- public SortClusterConfig load() {
+ public SortConfig load() {
String fileName = SortClusterConfigType.DEFAULT_FILE;
try {
if (context != null) {
@@ -55,22 +45,13 @@ public class ClassResourceSortClusterConfigLoader
implements SortClusterConfigLo
Charset.defaultCharset());
int index = confString.indexOf('{');
confString = confString.substring(index);
- ObjectMapper objectMapper = new ObjectMapper();
- SortClusterConfig config = objectMapper.readValue(confString,
SortClusterConfig.class);
- return config;
- } catch (UnsupportedEncodingException e) {
- LOG.error("fail to load properties, file ={}, and e= {}",
fileName, e);
+ return objectMapper.readValue(confString, SortConfig.class);
} catch (Exception e) {
- LOG.error("fail to load properties, file ={}, and e= {}",
fileName, e);
+ LOG.error("failed to load properties, file ={}", fileName, e);
}
- return SortClusterConfig.builder().build();
+ return SortConfig.builder().build();
}
- /**
- * configure
- *
- * @param context
- */
@Override
public void configure(Context context) {
this.context = context;
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
similarity index 71%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
index 83724b32d9..7882e4e310 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.loader.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.http.HttpHeaders;
@@ -33,28 +33,17 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
-/**
- *
- * ManagerSortClusterConfigLoader
- */
-public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader
{
-
- public static final Logger LOG =
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+@Slf4j
+public class ManagerSortClusterConfigLoader implements SortConfigLoader {
private Context context;
private CloseableHttpClient httpClient;
private ObjectMapper objectMapper = new ObjectMapper();
private String md5;
- /**
- * constructHttpClient
- *
- * @return
- */
private static synchronized CloseableHttpClient constructHttpClient() {
long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
RequestConfig requestConfig = RequestConfig.custom()
@@ -65,54 +54,43 @@ public class ManagerSortClusterConfigLoader implements
SortClusterConfigLoader {
return httpClientBuilder.build();
}
- /**
- * configure
- *
- * @param context
- */
@Override
public void configure(Context context) {
this.context = context;
this.httpClient = constructHttpClient();
}
- /**
- * load
- *
- * @return
- */
@Override
- public SortClusterConfig load() {
+ public SortConfig load() {
HttpGet httpGet = null;
try {
String clusterName =
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- String url = ManagerUrlHandler.getSortClusterConfigUrl() +
"?apiVersion=1.0&clusterName="
+ String url = ManagerUrlHandler.getSortClusterConfigUrl() +
"?clusterName="
+ clusterName + "&md5=";
if (StringUtils.isNotBlank(this.md5)) {
url += this.md5;
}
- LOG.info("start to request {} to get config info", url);
+ log.info("start to request {} to get config info", url);
httpGet = new HttpGet(url);
httpGet.addHeader(HttpHeaders.CONNECTION, "close");
// request with get
CloseableHttpResponse response = httpClient.execute(httpGet);
String returnStr = EntityUtils.toString(response.getEntity());
- LOG.info("end to request {},result:{}", url, returnStr);
- // get groupId <-> topic and m value.
+ log.info("end to request {}, result={}", url, returnStr);
- SortClusterResponse clusterResponse =
objectMapper.readValue(returnStr, SortClusterResponse.class);
+ SortConfigResponse clusterResponse =
objectMapper.readValue(returnStr, SortConfigResponse.class);
int errCode = clusterResponse.getCode();
- if (errCode != SortClusterResponse.SUCC && errCode !=
SortClusterResponse.NOUPDATE) {
- LOG.info("Fail to get config info from url:{}, error code is
{}, msg is {}",
+ if (errCode != SortConfigResponse.SUCC && errCode !=
SortConfigResponse.NO_UPDATE) {
+ log.error("failed to get config info from url={}, error
code={}, msg={}",
url, clusterResponse.getCode(),
clusterResponse.getMsg());
return null;
}
this.md5 = clusterResponse.getMd5();
- return clusterResponse.getData();
+ return objectMapper.readValue(clusterResponse.getData(),
SortConfig.class);
} catch (Exception ex) {
- LOG.error("exception caught", ex);
+ log.error("exception caught", ex);
return null;
} finally {
if (httpGet != null) {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
similarity index 76%
copy from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
index ad3a6bb892..7ec4753c87 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.sort;
+package org.apache.inlong.sort.standalone.config.loader.v2;
-import lombok.Data;
+import org.apache.inlong.common.pojo.sort.SortConfig;
-import java.io.Serializable;
-import java.util.List;
+import org.apache.flume.conf.Configurable;
-@Data
-public class SortConfig implements Serializable {
+public interface SortConfigLoader extends Configurable {
- private String sortClusterName;
- private List<SortTaskConfig> clusters;
+ SortConfig load();
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index 7ba7843655..8a5e46fdf6 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -27,6 +27,7 @@ import java.util.Optional;
/**
* generator for flume config
*/
+@Deprecated
public class FlumeConfigGenerator {
public static final String KEY_TASK_NAME = "taskName";
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
similarity index 70%
copy from
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
copy to
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
index 7ba7843655..3edf16af72 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone.utils;
+package org.apache.inlong.sort.standalone.utils.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-/**
- * generator for flume config
- */
public class FlumeConfigGenerator {
public static final String KEY_TASK_NAME = "taskName";
@@ -41,48 +38,28 @@ public class FlumeConfigGenerator {
public static Map<String, String>
generateFlumeConfiguration(SortTaskConfig taskConfig) {
Map<String, String> flumeConf = new HashMap<>();
- String name = taskConfig.getName();
- Map<String, String> sinkParams = taskConfig.getSinkParams();
- // channels
- appendChannels(flumeConf, name, sinkParams);
- // sinks
- appendSinks(flumeConf, name, sinkParams);
- // sources
- appendSources(flumeConf, name, sinkParams);
+ String sortTaskName = taskConfig.getSortTaskName();
+ appendChannels(flumeConf, sortTaskName);
+ appendSinks(flumeConf, sortTaskName);
+ appendSources(flumeConf, sortTaskName);
return flumeConf;
}
- /**
- * append channels config
- *
- * @param flumeConf final config of flume
- * @param name sort task name
- * @param sinkParams sink params of this task
- */
- private static void appendChannels(Map<String, String> flumeConf, String
name, Map<String, String> sinkParams) {
+ private static void appendChannels(Map<String, String> flumeConf, String
name) {
StringBuilder builder = new StringBuilder();
String channelName = name + "Channel";
flumeConf.put(name + ".channels", channelName);
String prefix =
builder.append(name).append(".channels.").append(channelName).append(".").toString();
builder.setLength(0);
String channelType = builder.append(prefix).append("type").toString();
- String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE,
- CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE));
+ String channelClass =
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
flumeConf.put(channelType, channelClass);
- appendCommon(flumeConf, prefix, null, name);
+ appendCommon(flumeConf, prefix, name);
}
- /**
- * appendCommon config
- *
- * @param flumeConf final config of flume
- * @param prefix prefix of common properties
- * @param componentParams common properties
- */
private static void appendCommon(
Map<String, String> flumeConf,
String prefix,
- Map<String, String> componentParams,
String name) {
StringBuilder builder = new StringBuilder();
String taskName =
builder.append(prefix).append(KEY_TASK_NAME).toString();
@@ -93,24 +70,9 @@ public class FlumeConfigGenerator {
String key =
builder.append(prefix).append(entry.getKey()).toString();
flumeConf.put(key, entry.getValue());
}
- // componentParams
- if (componentParams != null) {
- for (Map.Entry<String, String> entry : componentParams.entrySet())
{
- builder.setLength(0);
- String key =
builder.append(prefix).append(entry.getKey()).toString();
- flumeConf.put(key, entry.getValue());
- }
- }
}
- /**
- * append sink config
- *
- * @param flumeConf final config of flume
- * @param name sort task name
- * @param sinkParams sink params of this task
- */
- private static void appendSinks(Map<String, String> flumeConf, String
name, Map<String, String> sinkParams) {
+ private static void appendSinks(Map<String, String> flumeConf, String
name) {
// sinks
String sinkName = name + "Sink";
flumeConf.put(name + ".sinks", sinkName);
@@ -119,28 +81,19 @@ public class FlumeConfigGenerator {
// type
builder.setLength(0);
String sinkType = builder.append(prefix).append("type").toString();
- String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE,
- CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE));
+ String sinkClass =
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
flumeConf.put(sinkType, sinkClass);
// channel
builder.setLength(0);
String channelKey =
builder.append(prefix).append("channel").toString();
String channelName = name + "Channel";
flumeConf.put(channelKey, channelName);
- //
- appendCommon(flumeConf, prefix, sinkParams, name);
+
+ // common
+ appendCommon(flumeConf, prefix, name);
}
- /**
- * append source config
- *
- * @param flumeConf final config of flume
- * @param name sort task name
- * @param sinkParams sink params of this task
- */
- private static void appendSources(
- Map<String, String> flumeConf,
- String name, Map<String, String> sinkParams) {
+ private static void appendSources(Map<String, String> flumeConf, String
name) {
// sources
String sourceName = name + "Source";
flumeConf.put(name + ".sources", sourceName);
@@ -149,8 +102,7 @@ public class FlumeConfigGenerator {
// type
builder.setLength(0);
String sourceType = builder.append(prefix).append("type").toString();
- String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE,
- CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE));
+ String sourceClass =
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
flumeConf.put(sourceType, sourceClass);
// channel
builder.setLength(0);
@@ -183,6 +135,6 @@ public class FlumeConfigGenerator {
Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_STOP_TIME))
.map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
- appendCommon(flumeConf, prefix, null, name);
+ appendCommon(flumeConf, prefix, name);
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
index 0a3ae734b2..dcd2dd3ca3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -41,6 +41,7 @@ import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
/**
* SortCluster
*/
+@Deprecated
public class SortCluster {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SortCluster.class);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
index ee6510be89..71db1c7024 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
*
* SortTask
*/
+@Deprecated
public class SortTask {
public static final Logger LOG =
InlongLoggerFactory.getLogger(SortTask.class);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
similarity index 73%
copy from
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
copy to
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index 0a3ae734b2..f7e2e58cc6 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -15,44 +15,35 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone;
+package org.apache.inlong.sort.standalone.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
import org.apache.inlong.sdk.commons.admin.AdminTask;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flume.Context;
-import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
-/**
- * SortCluster
- */
+@Slf4j
public class SortCluster {
- public static final Logger LOG =
InlongLoggerFactory.getLogger(SortCluster.class);
-
private Timer reloadTimer;
private Map<String, SortTask> taskMap = new ConcurrentHashMap<>();
private List<SortTask> deletingTasks = new ArrayList<>();
private AdminTask adminTask;
- /**
- * start
- */
public void start() {
try {
this.reload();
@@ -61,18 +52,15 @@ public class SortCluster {
this.adminTask = new AdminTask(new
Context(CommonPropertiesHolder.get()));
this.adminTask.start();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ log.error("failed to start sort cluster", e);
}
}
- /**
- * close
- */
public void close() {
try {
this.reloadTimer.cancel();
// stop sort task
- for (Entry<String, SortTask> entry : this.taskMap.entrySet()) {
+ for (Map.Entry<String, SortTask> entry : this.taskMap.entrySet()) {
entry.getValue().stop();
}
// stop admin task
@@ -80,20 +68,14 @@ public class SortCluster {
this.adminTask.stop();
}
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ log.error("failed to close sort cluster", e);
}
}
- /**
- * setReloadTimer
- */
private void setReloadTimer() {
reloadTimer = new Timer(true);
TimerTask task = new TimerTask() {
- /**
- * run
- */
public void run() {
reload();
}
@@ -102,19 +84,16 @@ public class SortCluster {
reloadTimer.schedule(task, new Date(System.currentTimeMillis() +
reloadInterval), reloadInterval);
}
- /**
- * reload
- */
public void reload() {
try {
// get new config
- SortClusterConfig newConfig =
SortClusterConfigHolder.getClusterConfig();
+ SortConfig newConfig = SortConfigHolder.getSortConfig();
if (newConfig == null) {
return;
}
// add new task
- for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
- String newTaskName = taskConfig.getName();
+ for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+ String newTaskName = taskConfig.getSortTaskName();
if (taskMap.containsKey(newTaskName)) {
continue;
}
@@ -124,11 +103,11 @@ public class SortCluster {
}
// remove task
deletingTasks.clear();
- for (Entry<String, SortTask> entry : taskMap.entrySet()) {
+ for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
String taskName = entry.getKey();
boolean isFound = false;
- for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
- if (taskName.equals(taskConfig.getName())) {
+ for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+ if (taskName.equals(taskConfig.getSortTaskName())) {
isFound = true;
break;
}
@@ -143,7 +122,7 @@ public class SortCluster {
taskMap.remove(task.getTaskName());
}
} catch (Throwable e) {
- LOG.error(e.getMessage(), e);
+ log.error("failed to reload cluster", e);
}
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
similarity index 57%
copy from
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
copy to
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
index ee6510be89..d94ba6ffdf 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
@@ -15,81 +15,54 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.standalone;
+package org.apache.inlong.sort.standalone.v2;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.sort.standalone.PropertiesConfigurationProvider;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flume.Channel;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
import org.apache.flume.node.MaterializedConfiguration;
-import org.slf4j.Logger;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantLock;
-/**
- *
- * SortTask
- */
+@Slf4j
public class SortTask {
- public static final Logger LOG =
InlongLoggerFactory.getLogger(SortTask.class);
-
private final String taskName;
private final LifecycleSupervisor supervisor;
private MaterializedConfiguration materializedConfiguration;
private final ReentrantLock lifecycleLock = new ReentrantLock();
- /**
- * Constructor
- *
- * @param taskName
- */
public SortTask(String taskName) {
this.taskName = taskName;
this.supervisor = new LifecycleSupervisor();
}
- /**
- * get taskName
- *
- * @return the taskName
- */
- public String getTaskName() {
- return taskName;
- }
-
- /**
- * start
- */
public void start() {
- SortTaskConfig config =
SortClusterConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig config = SortConfigHolder.getTaskConfig(taskName);
if (config == null) {
return;
}
-
- //
Map<String, String> flumeConfiguration =
FlumeConfigGenerator.generateFlumeConfiguration(config);
- LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
+ log.info("start sort task={}, config={}", taskName,
flumeConfiguration);
PropertiesConfigurationProvider configurationProvider = new
PropertiesConfigurationProvider(
- config.getName(), flumeConfiguration);
+ config.getSortTaskName(), flumeConfiguration);
this.handleConfigurationEvent(configurationProvider.getConfiguration());
}
- /**
- * handleConfigurationEvent
- *
- * @param conf
- */
+ public String getTaskName() {
+ return taskName;
+ }
+
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
@@ -97,8 +70,7 @@ public class SortTask {
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
- LOG.info("Interrupted while trying to handle configuration event");
- return;
+ log.error("interrupted while trying to handle configuration
event", e);
} finally {
// If interrupted while trying to lock, we don't own the lock, so
must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
@@ -107,9 +79,6 @@ public class SortTask {
}
}
- /**
- * stop
- */
public void stop() {
lifecycleLock.lock();
stopAllComponents();
@@ -120,94 +89,84 @@ public class SortTask {
}
}
- /**
- * stopAllComponents
- */
private void stopAllComponents() {
if (this.materializedConfiguration != null) {
- LOG.info("Shutting down configuration: {}",
this.materializedConfiguration);
- for (Entry<String, SourceRunner> entry :
this.materializedConfiguration.getSourceRunners().entrySet()) {
+ log.info("shutting down configuration: {}",
this.materializedConfiguration);
+ for (Map.Entry<String, SourceRunner> entry :
this.materializedConfiguration.getSourceRunners().entrySet()) {
try {
- LOG.info("Stopping Source " + entry.getKey());
+ log.info("stopping Source " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- LOG.error("Error while stopping {}", entry.getValue(), e);
+ log.error("error while stopping {}", entry.getValue(), e);
}
}
- for (Entry<String, SinkRunner> entry :
this.materializedConfiguration.getSinkRunners().entrySet()) {
+ for (Map.Entry<String, SinkRunner> entry :
this.materializedConfiguration.getSinkRunners().entrySet()) {
try {
- LOG.info("Stopping Sink " + entry.getKey());
+ log.info("stopping Sink " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- LOG.error("Error while stopping {}", entry.getValue(), e);
+ log.error("error while stopping {}", entry.getValue(), e);
}
}
- for (Entry<String, Channel> entry :
this.materializedConfiguration.getChannels().entrySet()) {
+ for (Map.Entry<String, Channel> entry :
this.materializedConfiguration.getChannels().entrySet()) {
try {
- LOG.info("Stopping Channel " + entry.getKey());
+ log.info("stopping Channel " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- LOG.error("Error while stopping {}", entry.getValue(), e);
+ log.error("error while stopping {}", entry.getValue(), e);
}
}
}
}
- /**
- * startAllComponents
- *
- * @param materializedConfiguration
- */
private void startAllComponents(MaterializedConfiguration
materializedConfiguration) {
- LOG.info("Starting new configuration:{}", materializedConfiguration);
+ log.info("starting new configuration:{}", materializedConfiguration);
this.materializedConfiguration = materializedConfiguration;
- for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
+ for (Map.Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try {
- LOG.info("Starting Channel " + entry.getKey());
+ log.info("starting channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ new
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
} catch (Exception e) {
- LOG.error("Error while starting {}", entry.getValue(), e);
+ log.error("error while starting {}", entry.getValue(), e);
}
}
- /*
- * Wait for all channels to start.
- */
+ // Wait for all channels to start.
for (Channel ch : materializedConfiguration.getChannels().values()) {
while (ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)) {
try {
- LOG.info("Waiting for channel: " + ch.getName() + " to
start. Sleeping for 500 ms");
+ log.info("waiting for channel: " + ch.getName() + " to
start. Sleeping for 500 ms");
Thread.sleep(500);
} catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for channel to
start.", e);
+ log.error("interrupted while waiting for channel to
start.", e);
}
}
}
- for (Entry<String, SinkRunner> entry :
materializedConfiguration.getSinkRunners().entrySet()) {
+ for (Map.Entry<String, SinkRunner> entry :
materializedConfiguration.getSinkRunners().entrySet()) {
try {
- LOG.info("Starting Sink " + entry.getKey());
+ log.info("starting sink " + entry.getKey());
supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ new
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
} catch (Exception e) {
- LOG.error("Error while starting {}", entry.getValue(), e);
+ log.error("error while starting {}", entry.getValue(), e);
}
}
- for (Entry<String, SourceRunner> entry :
materializedConfiguration.getSourceRunners().entrySet()) {
+ for (Map.Entry<String, SourceRunner> entry :
materializedConfiguration.getSourceRunners().entrySet()) {
try {
- LOG.info("Starting Source " + entry.getKey());
+ log.info("starting source " + entry.getKey());
supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ new
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
} catch (Exception e) {
- LOG.error("Error while starting {}", entry.getValue(), e);
+ log.error("error while starting {}", entry.getValue(), e);
}
}
}
-}
\ No newline at end of file
+}