This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b2284808f [INLONG-5020][Manager] Refactoring InnerInlongManagerClient
of manager (#5021)
b2284808f is described below
commit b2284808fa77957425d288b0b3be3b5f2d558e33
Author: haifxu <[email protected]>
AuthorDate: Fri Jul 15 14:11:14 2022 +0800
[INLONG-5020][Manager] Refactoring InnerInlongManagerClient of manager
(#5021)
---
.../inlong/manager/client/cli/DescribeCommand.java | 64 +--
.../inlong/manager/client/cli/ListCommand.java | 66 +--
.../manager/client/cli/util/ClientUtils.java | 69 +--
.../api/impl/DefaultInlongStreamBuilder.java | 62 ++-
.../manager/client/api/impl/InlongClientImpl.java | 22 +-
.../manager/client/api/impl/InlongGroupImpl.java | 75 +--
.../manager/client/api/impl/InlongStreamImpl.java | 67 ++-
.../client/api/impl/LowLevelInlongClientImpl.java | 16 +-
.../client/api/inner/InnerInlongManagerClient.java | 580 ---------------------
.../client/api/inner/client/ClientFactory.java | 52 ++
.../api/inner/client/InlongClusterClient.java | 52 ++
.../client/api/inner/client/InlongGroupClient.java | 219 ++++++++
.../api/inner/client/InlongStreamClient.java | 112 ++++
.../client/api/inner/client/StreamSinkClient.java | 98 ++++
.../api/inner/client/StreamSourceClient.java | 90 ++++
.../api/inner/client/StreamTransformClient.java | 88 ++++
.../client/api/inner/client/WorkflowClient.java | 83 +++
.../manager/client/api/util/ClientUtils.java | 121 +++++
...nagerClientTest.java => ClientFactoryTest.java} | 62 ++-
19 files changed, 1181 insertions(+), 817 deletions(-)
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
index 461f2c2cc..38edd6d3e 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
@@ -20,8 +20,10 @@ package org.apache.inlong.manager.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.util.PrintUtils;
@@ -31,7 +33,6 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import java.io.IOException;
import java.util.List;
/**
@@ -45,41 +46,28 @@ public class DescribeCommand extends AbstractCommand {
public DescribeCommand() {
super("describe");
- InlongClientImpl inlongClient;
- try {
- inlongClient = ClientUtils.getClient();
- } catch (IOException e) {
- System.err.println("get inlong client error");
- System.err.println(e.getMessage());
- return;
- }
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
- jcommander.addCommand("stream", new DescribeStream(managerClient));
- jcommander.addCommand("group", new DescribeGroup(managerClient));
- jcommander.addCommand("sink", new DescribeSink(managerClient));
- jcommander.addCommand("source", new DescribeSource(managerClient));
+ jcommander.addCommand("stream", new DescribeStream());
+ jcommander.addCommand("group", new DescribeGroup());
+ jcommander.addCommand("sink", new DescribeSink());
+ jcommander.addCommand("source", new DescribeSource());
}
@Parameters(commandDescription = "Get stream details")
private static class DescribeStream extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private java.util.List<String> params;
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String groupId;
- DescribeStream(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<InlongStreamInfo> streamInfos =
managerClient.listStreamInfo(groupId);
+ ClientUtils.initClientFactory();
+ InlongStreamClient streamClient =
ClientUtils.clientFactory.getStreamClient();
+ List<InlongStreamInfo> streamInfos =
streamClient.listStreamInfo(groupId);
streamInfos.forEach(PrintUtils::printJson);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -90,8 +78,6 @@ public class DescribeCommand extends AbstractCommand {
@Parameters(commandDescription = "Get group details")
private static class DescribeGroup extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private java.util.List<String> params;
@@ -104,16 +90,14 @@ public class DescribeCommand extends AbstractCommand {
@Parameter(names = {"-n", "--num"}, description = "the number
displayed")
private int pageSize;
- DescribeGroup(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
+ ClientUtils.initClientFactory();
+ InlongGroupClient groupClient =
ClientUtils.clientFactory.getGroupClient();
InlongGroupPageRequest pageRequest = new
InlongGroupPageRequest();
pageRequest.setKeyword(group);
- PageInfo<InlongGroupListResponse> pageInfo =
managerClient.listGroups(pageRequest);
+ PageInfo<InlongGroupListResponse> pageInfo =
groupClient.listGroups(pageRequest);
PrintUtils.print(pageInfo.getList(), GroupInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -124,8 +108,6 @@ public class DescribeCommand extends AbstractCommand {
@Parameters(commandDescription = "Get sink details")
private static class DescribeSink extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private java.util.List<String> params;
@@ -135,14 +117,12 @@ public class DescribeCommand extends AbstractCommand {
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String group;
- DescribeSink(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<StreamSink> streamSinks = managerClient.listSinks(group,
stream);
+ ClientUtils.initClientFactory();
+ StreamSinkClient sinkClient =
ClientUtils.clientFactory.getSinkClient();
+ List<StreamSink> streamSinks = sinkClient.listSinks(group,
stream);
streamSinks.forEach(PrintUtils::printJson);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -153,8 +133,6 @@ public class DescribeCommand extends AbstractCommand {
@Parameters(commandDescription = "Get source details")
private static class DescribeSource extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private java.util.List<String> params;
@@ -167,14 +145,12 @@ public class DescribeCommand extends AbstractCommand {
@Parameter(names = {"-t", "--type"}, description = "sink type")
private String type;
- DescribeSource(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<StreamSource> sources = managerClient.listSources(group,
stream, type);
+ ClientUtils.initClientFactory();
+ StreamSourceClient sourceClient =
ClientUtils.clientFactory.getSourceClient();
+ List<StreamSource> sources = sourceClient.listSources(group,
stream, type);
sources.forEach(PrintUtils::printJson);
} catch (Exception e) {
System.out.println(e.getMessage());
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index a8702c104..a91e4925f 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -21,8 +21,10 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
import org.apache.inlong.manager.client.cli.pojo.SinkInfo;
import org.apache.inlong.manager.client.cli.pojo.SourceInfo;
@@ -35,7 +37,6 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import java.io.IOException;
import java.util.List;
/**
@@ -49,42 +50,28 @@ public class ListCommand extends AbstractCommand {
public ListCommand() {
super("list");
- InlongClientImpl inlongClient;
- try {
- inlongClient = ClientUtils.getClient();
- } catch (IOException e) {
- System.err.println("get inlong client error");
- System.err.println(e.getMessage());
- return;
- }
-
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
- jcommander.addCommand("stream", new ListStream(managerClient));
- jcommander.addCommand("group", new ListGroup(managerClient));
- jcommander.addCommand("sink", new ListSink(managerClient));
- jcommander.addCommand("source", new ListSource(managerClient));
+ jcommander.addCommand("stream", new ListStream());
+ jcommander.addCommand("group", new ListGroup());
+ jcommander.addCommand("sink", new ListSink());
+ jcommander.addCommand("source", new ListSource());
}
@Parameters(commandDescription = "Get stream summary information")
private static class ListStream extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private List<String> params;
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String groupId;
- ListStream(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<InlongStreamInfo> streamInfos =
managerClient.listStreamInfo(groupId);
+ ClientUtils.initClientFactory();
+ InlongStreamClient streamClient =
ClientUtils.clientFactory.getStreamClient();
+ List<InlongStreamInfo> streamInfos =
streamClient.listStreamInfo(groupId);
PrintUtils.print(streamInfos, StreamInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -97,8 +84,6 @@ public class ListCommand extends AbstractCommand {
private static final int DEFAULT_PAGE_SIZE = 10;
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private List<String> params;
@@ -111,10 +96,6 @@ public class ListCommand extends AbstractCommand {
@Parameter(names = {"-n", "--num"}, description = "the number
displayed")
private int pageSize;
- ListGroup(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
@@ -129,7 +110,10 @@ public class ListCommand extends AbstractCommand {
List<Integer> statusList =
SimpleGroupStatus.parseStatusCodeByStr(status);
pageRequest.setStatusList(statusList);
- PageInfo<InlongGroupListResponse> groupPageInfo =
managerClient.listGroups(pageRequest);
+ ClientUtils.initClientFactory();
+ InlongGroupClient groupClient =
ClientUtils.clientFactory.getGroupClient();
+
+ PageInfo<InlongGroupListResponse> groupPageInfo =
groupClient.listGroups(pageRequest);
PrintUtils.print(groupPageInfo.getList(), GroupInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -140,8 +124,6 @@ public class ListCommand extends AbstractCommand {
@Parameters(commandDescription = "Get sink summary information")
private static class ListSink extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private List<String> params;
@@ -151,14 +133,12 @@ public class ListCommand extends AbstractCommand {
@Parameter(names = {"-g", "--group"}, required = true, description =
"group id")
private String group;
- ListSink(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<StreamSink> streamSinks = managerClient.listSinks(group,
stream);
+ ClientUtils.initClientFactory();
+ StreamSinkClient sinkClient =
ClientUtils.clientFactory.getSinkClient();
+ List<StreamSink> streamSinks = sinkClient.listSinks(group,
stream);
PrintUtils.print(streamSinks, SinkInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
@@ -169,8 +149,6 @@ public class ListCommand extends AbstractCommand {
@Parameters(commandDescription = "Get source summary information")
private static class ListSource extends AbstractCommandRunner {
- private final InnerInlongManagerClient managerClient;
-
@Parameter()
private List<String> params;
@@ -183,14 +161,12 @@ public class ListCommand extends AbstractCommand {
@Parameter(names = {"-t", "--type"}, description = "source type")
private String type;
- ListSource(InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
- }
-
@Override
void run() {
try {
- List<StreamSource> streamSources =
managerClient.listSources(group, stream, type);
+ ClientUtils.initClientFactory();
+ StreamSourceClient sourceClient =
ClientUtils.clientFactory.getSourceClient();
+ List<StreamSource> streamSources =
sourceClient.listSources(group, stream, type);
PrintUtils.print(streamSources, SourceInfo.class);
} catch (Exception e) {
System.out.println(e.getMessage());
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
index 289107777..5ade5e425 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.client.cli.util;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import java.io.BufferedInputStream;
import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -39,49 +39,58 @@ public class ClientUtils {
private static final String CONFIG_FILE = "application.properties";
- /**
- * Get an inlong client instance.
- */
- public static InlongClientImpl getClient() throws IOException {
- Properties properties = new Properties();
- String path =
Thread.currentThread().getContextClassLoader().getResource("").getPath() +
CONFIG_FILE;
- InputStream inputStream = new
BufferedInputStream(Files.newInputStream(Paths.get(path)));
- properties.load(inputStream);
+ private static ClientConfiguration configuration;
- String serviceUrl = properties.getProperty("server.host") + ":" +
properties.getProperty("server.port");
- String user = properties.getProperty("default.admin.user");
- String password = properties.getProperty("default.admin.password");
+ private static String serviceUrl;
- ClientConfiguration configuration = new ClientConfiguration();
- configuration.setAuthentication(new DefaultAuthentication(user,
password));
+ public static ClientFactory clientFactory;
+ /**
+ * Get an inlong client instance.
+ */
+ public static InlongClientImpl getClient() {
+ initClientConfiguration();
return new InlongClientImpl(serviceUrl, configuration);
}
+ public static void initClientFactory() {
+ clientFactory =
org.apache.inlong.manager.client.api.util.ClientUtils.getClientFactory(
+ getClient().getConfiguration());
+ }
+
/**
* Get the file content.
*/
public static String readFile(File file) {
if (!file.exists()) {
System.out.println("File does not exist.");
- } else {
- try {
- FileReader fileReader = new FileReader(file);
- Reader reader = new
InputStreamReader(Files.newInputStream(file.toPath()));
- int ch;
- StringBuilder stringBuilder = new StringBuilder();
- while ((ch = reader.read()) != -1) {
- stringBuilder.append((char) ch);
- }
- fileReader.close();
- reader.close();
-
- return stringBuilder.toString();
- } catch (Exception e) {
- System.out.println(e.getMessage());
+ return null;
+ }
+ StringBuilder stringBuilder = new StringBuilder();
+ try (Reader reader = new
InputStreamReader(Files.newInputStream(file.toPath()))) {
+ int ch;
+ while ((ch = reader.read()) != -1) {
+ stringBuilder.append((char) ch);
}
+ } catch (IOException e) {
+ System.out.println(e.getMessage());
}
- return null;
+ return stringBuilder.toString();
}
+ private static void initClientConfiguration() {
+ Properties properties = new Properties();
+ String path =
Thread.currentThread().getContextClassLoader().getResource("").getPath() +
CONFIG_FILE;
+ try (InputStream inputStream = new
BufferedInputStream(Files.newInputStream(Paths.get(path)))) {
+ properties.load(inputStream);
+ serviceUrl = properties.getProperty("server.host") + ":" +
properties.getProperty("server.port");
+ String user = properties.getProperty("default.admin.user");
+ String password = properties.getProperty("default.admin.password");
+
+ configuration = new ClientConfiguration();
+ configuration.setAuthentication(new DefaultAuthentication(user,
password));
+ } catch (IOException e) {
+ System.out.println(e.getMessage());
+ }
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 2c3245bcb..11aaa3313 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -22,11 +22,17 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -51,11 +57,21 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
private final InlongStreamImpl inlongStream;
private final InnerStreamContext streamContext;
- private final InnerInlongManagerClient managerClient;
+
+ private final InlongStreamClient streamClient;
+ private final StreamSourceClient sourceClient;
+ private final StreamSinkClient sinkClient;
+ private final StreamTransformClient transformClient;
public DefaultInlongStreamBuilder(InlongStreamInfo streamInfo,
InnerGroupContext groupContext,
- InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
+ ClientConfiguration configuration) {
+
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(configuration);
+ this.streamClient = clientFactory.getStreamClient();
+ this.sourceClient = clientFactory.getSourceClient();
+ this.sinkClient = clientFactory.getSinkClient();
+ this.transformClient = clientFactory.getTransformClient();
+
if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
groupContext.setStreamContextMap(Maps.newHashMap());
}
@@ -68,7 +84,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
groupContext.setStreamContext(streamContext);
this.streamContext = streamContext;
- this.inlongStream = new InlongStreamImpl(groupId,
streamInfo.getInlongStreamId(), managerClient);
+ this.inlongStream = new InlongStreamImpl(groupId,
streamInfo.getInlongStreamId(), configuration);
if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
this.inlongStream.setStreamFields(streamInfo.getFieldList());
}
@@ -116,21 +132,21 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
streamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
- streamInfo.setId(managerClient.createStreamInfo(streamInfo));
+ streamInfo.setId(streamClient.createStreamInfo(streamInfo));
// Create source and update index
List<SourceRequest> sourceRequests =
Lists.newArrayList(streamContext.getSourceRequests().values());
for (SourceRequest sourceRequest : sourceRequests) {
- sourceRequest.setId(managerClient.createSource(sourceRequest));
+ sourceRequest.setId(sourceClient.createSource(sourceRequest));
}
// Create sink and update index
List<SinkRequest> sinkRequests =
Lists.newArrayList(streamContext.getSinkRequests().values());
for (SinkRequest sinkRequest : sinkRequests) {
- sinkRequest.setId(managerClient.createSink(sinkRequest));
+ sinkRequest.setId(sinkClient.createSink(sinkRequest));
}
// Create transform and update index
List<TransformRequest> transformRequests =
Lists.newArrayList(streamContext.getTransformRequests().values());
for (TransformRequest transformRequest : transformRequests) {
-
transformRequest.setId(managerClient.createTransform(transformRequest));
+
transformRequest.setId(transformClient.createTransform(transformRequest));
}
return inlongStream;
}
@@ -140,9 +156,9 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
dataStreamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
- Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
+ Boolean isExist = streamClient.isStreamExists(dataStreamInfo);
if (isExist) {
- Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(dataStreamInfo);
+ Pair<Boolean, String> updateMsg =
streamClient.updateStreamInfo(dataStreamInfo);
if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream
failed:%s", updateMsg.getValue()));
}
@@ -160,7 +176,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- List<TransformResponse> transformResponses =
managerClient.listTransform(groupId, streamId);
+ List<TransformResponse> transformResponses =
transformClient.listTransform(groupId, streamId);
List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform =
StreamTransformTransfer.parseStreamTransform(transformResponse);
@@ -169,14 +185,14 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
if (transformRequests.get(transformName) == null) {
TransformRequest transformRequest =
StreamTransformTransfer.createTransformRequest(transform,
streamInfo);
- boolean isDelete =
managerClient.deleteTransform(transformRequest);
+ boolean isDelete =
transformClient.deleteTransform(transformRequest);
if (!isDelete) {
throw new RuntimeException(String.format("Delete
transform=%s failed", transformRequest));
}
} else {
TransformRequest transformRequest =
transformRequests.get(transformName);
transformRequest.setId(id);
- Pair<Boolean, String> updateState =
managerClient.updateTransform(transformRequest);
+ Pair<Boolean, String> updateState =
transformClient.updateTransform(transformRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
updateState.getValue()));
@@ -191,7 +207,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
continue;
}
TransformRequest transformRequest = requestEntry.getValue();
-
transformRequest.setId(managerClient.createTransform(transformRequest));
+
transformRequest.setId(transformClient.createTransform(transformRequest));
}
}
@@ -200,21 +216,21 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- List<StreamSource> streamSources = managerClient.listSources(groupId,
streamId);
+ List<StreamSource> streamSources = sourceClient.listSources(groupId,
streamId);
List<String> updateSourceNames = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(streamSources)) {
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (sourceRequests.get(sourceName) == null) {
- boolean isDelete = managerClient.deleteSource(id);
+ boolean isDelete = sourceClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete
source failed by id=%s", id));
}
} else {
SourceRequest sourceRequest =
sourceRequests.get(sourceName);
sourceRequest.setId(id);
- Pair<Boolean, String> updateState =
managerClient.updateSource(sourceRequest);
+ Pair<Boolean, String> updateState =
sourceClient.updateSource(sourceRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update
source=%s failed with err=%s", sourceRequest,
updateState.getValue()));
@@ -230,7 +246,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
continue;
}
SourceRequest sourceRequest = requestEntry.getValue();
- sourceRequest.setId(managerClient.createSource(sourceRequest));
+ sourceRequest.setId(sourceClient.createSource(sourceRequest));
}
}
@@ -239,20 +255,20 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- List<StreamSink> streamSinks = managerClient.listSinks(groupId,
streamId);
+ List<StreamSink> streamSinks = sinkClient.listSinks(groupId, streamId);
List<String> updateSinkNames = Lists.newArrayList();
for (StreamSink sink : streamSinks) {
final String sinkName = sink.getSinkName();
final int id = sink.getId();
if (sinkRequests.get(sinkName) == null) {
- boolean isDelete = managerClient.deleteSink(id);
+ boolean isDelete = sinkClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s
failed", sink));
}
} else {
SinkRequest sinkRequest = sinkRequests.get(sinkName);
sinkRequest.setId(id);
- Pair<Boolean, String> updateState =
managerClient.updateSink(sinkRequest);
+ Pair<Boolean, String> updateState =
sinkClient.updateSink(sinkRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", sinkRequest,
updateState.getValue()));
@@ -267,7 +283,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
continue;
}
SinkRequest sinkRequest = requestEntry.getValue();
- sinkRequest.setId(managerClient.createSink(sinkRequest));
+ sinkRequest.setId(sinkClient.createSink(sinkRequest));
}
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 0a2c52518..a03997a9f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -30,7 +30,8 @@ import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
@@ -51,6 +52,7 @@ public class InlongClientImpl implements InlongClient {
private static final String HOST_SPLITTER = ":";
@Getter
private final ClientConfiguration configuration;
+ private final InlongGroupClient groupClient;
public InlongClientImpl(String serviceUrl, ClientConfiguration
configuration) {
Map<String, String> hostPorts =
Splitter.on(URL_SPLITTER).withKeyValueSeparator(HOST_SPLITTER)
@@ -74,37 +76,36 @@ public class InlongClientImpl implements InlongClient {
throw new RuntimeException(String.format("%s is not connective",
serviceUrl));
}
this.configuration = configuration;
+ groupClient =
ClientUtils.getClientFactory(configuration).getGroupClient();
}
@Override
public InlongGroup forGroup(InlongGroupInfo groupInfo) {
- return new InlongGroupImpl(groupInfo, this);
+ return new InlongGroupImpl(groupInfo, configuration);
}
@Override
public List<InlongGroup> listGroup(String expr, int status, int pageNum,
int pageSize) {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(this.configuration);
- PageInfo<InlongGroupListResponse> responsePageInfo =
managerClient.listGroups(expr, status, pageNum,
+ PageInfo<InlongGroupListResponse> responsePageInfo =
groupClient.listGroups(expr, status, pageNum,
pageSize);
if (CollectionUtils.isEmpty(responsePageInfo.getList())) {
return Lists.newArrayList();
} else {
return responsePageInfo.getList().stream().map(response -> {
String groupId = response.getInlongGroupId();
- InlongGroupInfo groupInfo =
managerClient.getGroupInfo(groupId);
- return new InlongGroupImpl(groupInfo, this);
+ InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
+ return new InlongGroupImpl(groupInfo, configuration);
}).collect(Collectors.toList());
}
}
@Override
public Map<String, SimpleGroupStatus> listGroupStatus(List<String>
groupIds) {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(this.configuration);
InlongGroupPageRequest request = new InlongGroupPageRequest();
request.setGroupIdList(groupIds);
request.setListSources(true);
- PageInfo<InlongGroupListResponse> pageInfo =
managerClient.listGroups(request);
+ PageInfo<InlongGroupListResponse> pageInfo =
groupClient.listGroups(request);
List<InlongGroupListResponse> groupListResponses = pageInfo.getList();
Map<String, SimpleGroupStatus> groupStatusMap = Maps.newHashMap();
if (CollectionUtils.isNotEmpty(groupListResponses)) {
@@ -121,12 +122,11 @@ public class InlongClientImpl implements InlongClient {
@Override
public InlongGroup getGroup(String groupId) {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(this.configuration);
- InlongGroupInfo groupInfo = managerClient.getGroupInfo(groupId);
+ InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
if (groupInfo == null) {
return new BlankInlongGroup();
}
- return new InlongGroupImpl(groupInfo, this);
+ return new InlongGroupImpl(groupInfo, configuration);
}
private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus
groupStatus, List<StreamSource> sources) {
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index bdeb65236..560b7dee8 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -22,13 +22,18 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.WorkflowClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessStatus;
@@ -62,30 +67,36 @@ public class InlongGroupImpl implements InlongGroup {
private final InnerGroupContext groupContext;
private InlongGroupInfo groupInfo;
- private InnerInlongManagerClient managerClient;
+ private final InlongGroupClient groupClient;
+ private final WorkflowClient workFlowClient;
+ private final InlongStreamClient streamClient;
+ private final ClientConfiguration configuration;
- public InlongGroupImpl(InlongGroupInfo groupInfo, InlongClientImpl
inlongClient) {
+ public InlongGroupImpl(InlongGroupInfo groupInfo, ClientConfiguration
configuration) {
this.groupInfo = groupInfo;
this.groupContext = new InnerGroupContext();
this.groupContext.setGroupInfo(groupInfo);
- if (this.managerClient == null) {
- this.managerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
- }
+ this.configuration = configuration;
+
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(configuration);
+ this.streamClient = clientFactory.getStreamClient();
+ this.groupClient = clientFactory.getGroupClient();
+ this.workFlowClient = clientFactory.getWorkflowClient();
- InlongGroupInfo newGroupInfo =
managerClient.getGroupIfExists(groupInfo.getInlongGroupId());
+ InlongGroupInfo newGroupInfo =
groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
if (newGroupInfo != null) {
this.groupContext.setGroupInfo(groupInfo);
} else {
BaseSortConf sortConf = groupInfo.getSortConf();
InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
- String groupId = managerClient.createGroup(groupInfo.genRequest());
+ String groupId = groupClient.createGroup(groupInfo.genRequest());
groupInfo.setInlongGroupId(groupId);
}
}
@Override
public InlongStreamBuilder createStream(InlongStreamInfo streamInfo) {
- return new DefaultInlongStreamBuilder(streamInfo, this.groupContext,
this.managerClient);
+ return new DefaultInlongStreamBuilder(streamInfo, this.groupContext,
configuration);
}
@Override
@@ -96,7 +107,7 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext init() throws Exception {
InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
- WorkflowResult initWorkflowResult =
managerClient.initInlongGroup(groupInfo.genRequest());
+ WorkflowResult initWorkflowResult =
groupClient.initInlongGroup(groupInfo.genRequest());
List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
Preconditions.checkNotEmpty(taskViews, "init inlong group info
failed");
TaskResponse taskView = taskViews.get(0);
@@ -122,7 +133,7 @@ public class InlongGroupImpl implements InlongGroup {
formDataNew, NewGroupProcessForm.class);
Preconditions.checkNotNull(newGroupProcessForm, "NewGroupProcessForm
cannot be null");
groupContext.setInitMsg(newGroupProcessForm);
- WorkflowResult startWorkflowResult =
managerClient.startInlongGroup(taskId, newGroupProcessForm);
+ WorkflowResult startWorkflowResult =
workFlowClient.startInlongGroup(taskId, newGroupProcessForm);
processView = startWorkflowResult.getProcessInfo();
Preconditions.checkTrue(ProcessStatus.COMPLETED ==
processView.getStatus(),
String.format("inlong group status %s is incorrect, should be
COMPLETED", processView.getStatus()));
@@ -139,14 +150,14 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkTrue(groupId != null &&
groupId.equals(this.groupInfo.getInlongGroupId()),
"groupId must be same");
- InlongGroupInfo existGroupInfo = managerClient.getGroupInfo(groupId);
+ InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
SimpleGroupStatus status =
SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
"inlong group is in init status, should not be updated");
InlongGroupInfo groupInfo =
InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
InlongGroupRequest groupRequest = groupInfo.genRequest();
- Pair<String, String> idAndErr =
managerClient.updateGroup(groupRequest);
+ Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
String errMsg = idAndErr.getValue();
Preconditions.checkNull(errMsg, errMsg);
@@ -159,7 +170,7 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
final String groupId = this.groupInfo.getInlongGroupId();
- InlongGroupInfo groupInfo = managerClient.getGroupInfo(groupId);
+ InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
SimpleGroupStatus status =
SimpleGroupStatus.parseStatusByCode(groupInfo.getStatus());
Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
@@ -167,7 +178,7 @@ public class InlongGroupImpl implements InlongGroup {
groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo,
sortConf);
InlongGroupRequest groupRequest = groupInfo.genRequest();
- Pair<String, String> idAndErr =
managerClient.updateGroup(groupRequest);
+ Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
String errMsg = idAndErr.getValue();
Preconditions.checkNull(errMsg, errMsg);
this.groupContext.setGroupInfo(groupInfo);
@@ -177,7 +188,7 @@ public class InlongGroupImpl implements InlongGroup {
public InlongGroupContext reInitOnUpdate(InlongGroupInfo originGroupInfo,
BaseSortConf sortConf) throws Exception {
this.update(originGroupInfo, sortConf);
String inlongGroupId =
this.groupContext.getGroupInfo().getInlongGroupId();
- InlongGroupInfo newGroupInfo =
managerClient.getGroupIfExists(inlongGroupId);
+ InlongGroupInfo newGroupInfo =
groupClient.getGroupIfExists(inlongGroupId);
if (newGroupInfo != null) {
this.groupContext.setGroupInfo(newGroupInfo);
} else {
@@ -195,27 +206,27 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext suspend(boolean async) {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
- Pair<String, String> idAndErr =
managerClient.updateGroup(groupInfo.genRequest());
+ Pair<String, String> idAndErr =
groupClient.updateGroup(groupInfo.genRequest());
final String errMsg = idAndErr.getValue();
final String groupId = idAndErr.getKey();
Preconditions.checkNull(errMsg, errMsg);
- managerClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED,
async);
+ groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED,
async);
return generateSnapshot();
}
@Override
- public InlongGroupContext restart() throws Exception {
+ public InlongGroupContext restart() {
return restart(false);
}
@Override
- public InlongGroupContext restart(boolean async) throws Exception {
+ public InlongGroupContext restart(boolean async) {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
- Pair<String, String> idAndErr =
managerClient.updateGroup(groupInfo.genRequest());
+ Pair<String, String> idAndErr =
groupClient.updateGroup(groupInfo.genRequest());
final String errMsg = idAndErr.getValue();
final String groupId = idAndErr.getKey();
Preconditions.checkNull(errMsg, errMsg);
- managerClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED,
async);
+ groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED,
async);
return generateSnapshot();
}
@@ -226,8 +237,8 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext delete(boolean async) throws Exception {
- InlongGroupInfo groupInfo =
managerClient.getGroupInfo(groupContext.getGroupId());
- boolean isDeleted =
managerClient.deleteInlongGroup(groupInfo.getInlongGroupId(), async);
+ InlongGroupInfo groupInfo =
groupClient.getGroupInfo(groupContext.getGroupId());
+ boolean isDeleted =
groupClient.deleteInlongGroup(groupInfo.getInlongGroupId(), async);
if (isDeleted) {
groupInfo.setStatus(GroupStatus.DELETED.getCode());
}
@@ -235,23 +246,23 @@ public class InlongGroupImpl implements InlongGroup {
}
@Override
- public List<InlongStream> listStreams() throws Exception {
+ public List<InlongStream> listStreams() {
String inlongGroupId = this.groupContext.getGroupId();
return fetchInlongStreams(inlongGroupId);
}
@Override
- public InlongGroupContext reset(int rerun, int resetFinalStatus) throws
Exception {
+ public InlongGroupContext reset(int rerun, int resetFinalStatus) {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
InlongGroupResetRequest request = new
InlongGroupResetRequest(groupInfo.getInlongGroupId(),
rerun, resetFinalStatus);
- managerClient.resetGroup(request);
+ groupClient.resetGroup(request);
return generateSnapshot();
}
private InlongGroupContext generateSnapshot() {
// fetch current group
- InlongGroupInfo groupInfo =
managerClient.getGroupInfo(groupContext.getGroupId());
+ InlongGroupInfo groupInfo =
groupClient.getGroupInfo(groupContext.getGroupId());
groupContext.setGroupInfo(groupInfo);
String inlongGroupId = groupInfo.getInlongGroupId();
// fetch stream in group
@@ -263,7 +274,7 @@ public class InlongGroupImpl implements InlongGroup {
// create group context
InlongGroupContext inlongGroupContext = new
InlongGroupContext(groupContext);
// fetch group logs
- List<EventLogView> logViews =
managerClient.getInlongGroupError(inlongGroupId);
+ List<EventLogView> logViews =
workFlowClient.getInlongGroupError(inlongGroupId);
if (CollectionUtils.isNotEmpty(logViews)) {
Map<String, List<String>> errMsgMap = Maps.newHashMap();
Map<String, List<String>> groupLogMap = Maps.newHashMap();
@@ -285,7 +296,7 @@ public class InlongGroupImpl implements InlongGroup {
// fetch stream logs
Map<String, InlongStream> streams =
inlongGroupContext.getInlongStreamMap();
streams.keySet().forEach(streamId -> {
- List<InlongStreamConfigLogListResponse> logList =
managerClient.getStreamLogs(inlongGroupId, streamId);
+ List<InlongStreamConfigLogListResponse> logList =
streamClient.getStreamLogs(inlongGroupId, streamId);
if (CollectionUtils.isNotEmpty(logList)) {
Map<String, List<String>> streamLogs = Maps.newHashMap();
logList.stream().filter(x ->
StringUtils.isNotEmpty(x.getComponentName()))
@@ -301,12 +312,12 @@ public class InlongGroupImpl implements InlongGroup {
}
private List<InlongStream> fetchInlongStreams(String groupId) {
- List<InlongStreamInfo> streamInfos =
managerClient.listStreamInfo(groupId);
+ List<InlongStreamInfo> streamInfos =
streamClient.listStreamInfo(groupId);
if (CollectionUtils.isEmpty(streamInfos)) {
return null;
}
return streamInfos.stream()
- .map(streamInfo -> new InlongStreamImpl(streamInfo,
managerClient))
+ .map(streamInfo -> new InlongStreamImpl(streamInfo,
configuration))
.collect(Collectors.toList());
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index d6ff3b047..f1ff69066 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -24,8 +24,14 @@ import lombok.Data;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
@@ -41,6 +47,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -50,7 +57,13 @@ import java.util.stream.Collectors;
@Data
public class InlongStreamImpl implements InlongStream {
- private InnerInlongManagerClient managerClient;
+ private InlongStreamClient streamClient;
+
+ private StreamSourceClient sourceClient;
+
+ private StreamSinkClient sinkClient;
+
+ private StreamTransformClient transformClient;
private String inlongGroupId;
@@ -67,10 +80,16 @@ public class InlongStreamImpl implements InlongStream {
/**
* Constructor of InlongStreamImpl.
*/
- public InlongStreamImpl(InlongStreamInfo streamInfo,
InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
+ public InlongStreamImpl(InlongStreamInfo streamInfo, ClientConfiguration
configuration) {
this.inlongGroupId = streamInfo.getInlongGroupId();
this.inlongStreamId = streamInfo.getInlongStreamId();
+
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(configuration);
+ this.streamClient = clientFactory.getStreamClient();
+ this.sourceClient = clientFactory.getSourceClient();
+ this.sinkClient = clientFactory.getSinkClient();
+ this.transformClient = clientFactory.getTransformClient();
+
List<StreamField> streamFields = streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFields)) {
this.streamFields = streamFields.stream()
@@ -108,8 +127,14 @@ public class InlongStreamImpl implements InlongStream {
}
}
- public InlongStreamImpl(String groupId, String streamId,
InnerInlongManagerClient managerClient) {
- this.managerClient = managerClient;
+ public InlongStreamImpl(String groupId, String streamId,
ClientConfiguration configuration) {
+ if (Optional.ofNullable(configuration).isPresent()) {
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(configuration);
+ this.streamClient = clientFactory.getStreamClient();
+ this.sourceClient = clientFactory.getSourceClient();
+ this.sinkClient = clientFactory.getSinkClient();
+ this.transformClient = clientFactory.getTransformClient();
+ }
this.inlongGroupId = groupId;
this.inlongStreamId = streamId;
}
@@ -280,7 +305,7 @@ public class InlongStreamImpl implements InlongStream {
@Override
public InlongStream update() {
- InlongStreamInfo streamInfo =
managerClient.getStreamInfo(inlongGroupId, inlongStreamId);
+ InlongStreamInfo streamInfo =
streamClient.getStreamInfo(inlongGroupId, inlongStreamId);
if (streamInfo == null) {
throw new IllegalArgumentException(
String.format("Stream not exists for group=%s and
stream=%s", inlongGroupId, inlongStreamId));
@@ -289,7 +314,7 @@ public class InlongStreamImpl implements InlongStream {
streamInfo.setFieldList(this.streamFields);
StreamPipeline streamPipeline = createPipeline();
streamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
- Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(streamInfo);
+ Pair<Boolean, String> updateMsg =
streamClient.updateStreamInfo(streamInfo);
if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream
failed: %s", updateMsg.getValue()));
}
@@ -300,7 +325,7 @@ public class InlongStreamImpl implements InlongStream {
}
private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
- List<TransformResponse> transformResponses =
managerClient.listTransform(inlongGroupId, inlongStreamId);
+ List<TransformResponse> transformResponses =
transformClient.listTransform(inlongGroupId, inlongStreamId);
List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform =
StreamTransformTransfer.parseStreamTransform(transformResponse);
@@ -309,7 +334,7 @@ public class InlongStreamImpl implements InlongStream {
if (this.streamTransforms.get(transformName) == null) {
TransformRequest transformRequest =
StreamTransformTransfer.createTransformRequest(transform,
streamInfo);
- boolean isDelete =
managerClient.deleteTransform(transformRequest);
+ boolean isDelete =
transformClient.deleteTransform(transformRequest);
if (!isDelete) {
throw new RuntimeException(String.format("Delete
transform=%s failed", transformRequest));
}
@@ -318,7 +343,7 @@ public class InlongStreamImpl implements InlongStream {
TransformRequest transformRequest =
StreamTransformTransfer.createTransformRequest(newTransform,
streamInfo);
transformRequest.setId(id);
- Pair<Boolean, String> updateState =
managerClient.updateTransform(transformRequest);
+ Pair<Boolean, String> updateState =
transformClient.updateTransform(transformRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update
transform=%s failed with err=%s", transformRequest,
updateState.getValue()));
@@ -334,18 +359,18 @@ public class InlongStreamImpl implements InlongStream {
StreamTransform transform = transformEntry.getValue();
TransformRequest transformRequest =
StreamTransformTransfer.createTransformRequest(transform,
streamInfo);
- managerClient.createTransform(transformRequest);
+ transformClient.createTransform(transformRequest);
}
}
private void initOrUpdateSource(InlongStreamInfo streamInfo) {
- List<StreamSource> streamSources =
managerClient.listSources(inlongGroupId, inlongStreamId);
+ List<StreamSource> streamSources =
sourceClient.listSources(inlongGroupId, inlongStreamId);
List<String> updateSourceNames = Lists.newArrayList();
for (StreamSource source : streamSources) {
final String sourceName = source.getSourceName();
final int id = source.getId();
if (this.streamSources.get(sourceName) == null) {
- boolean isDelete = managerClient.deleteSource(id);
+ boolean isDelete = sourceClient.deleteSource(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete source=%s
failed", source));
}
@@ -354,7 +379,7 @@ public class InlongStreamImpl implements InlongStream {
streamSource.setId(id);
streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
streamSource.setInlongStreamId(streamInfo.getInlongStreamId());
- Pair<Boolean, String> updateState =
managerClient.updateSource(streamSource.genSourceRequest());
+ Pair<Boolean, String> updateState =
sourceClient.updateSource(streamSource.genSourceRequest());
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update source=%s
failed with err=%s", streamSource,
updateState.getValue()));
@@ -370,19 +395,19 @@ public class InlongStreamImpl implements InlongStream {
StreamSource streamSource = sourceEntry.getValue();
streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
streamSource.setInlongStreamId(streamInfo.getInlongStreamId());
- managerClient.createSource(streamSource.genSourceRequest());
+ sourceClient.createSource(streamSource.genSourceRequest());
}
}
private void initOrUpdateSink(InlongStreamInfo streamInfo) {
- List<StreamSink> streamSinks = managerClient.listSinks(inlongGroupId,
inlongStreamId);
+ List<StreamSink> streamSinks = sinkClient.listSinks(inlongGroupId,
inlongStreamId);
// delete or update the sink info
List<String> updateSinkNames = Lists.newArrayList();
for (StreamSink sink : streamSinks) {
final String sinkName = sink.getSinkName();
final int id = sink.getId();
if (this.streamSinks.get(sinkName) == null) {
- boolean isDelete = managerClient.deleteSink(id);
+ boolean isDelete = sinkClient.deleteSink(id);
if (!isDelete) {
throw new RuntimeException(String.format("Delete sink=%s
failed", sink));
}
@@ -391,7 +416,7 @@ public class InlongStreamImpl implements InlongStream {
streamSink.setId(id);
streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
streamSink.setInlongStreamId(streamInfo.getInlongStreamId());
- Pair<Boolean, String> updateState =
managerClient.updateSink(streamSink.genSinkRequest());
+ Pair<Boolean, String> updateState =
sinkClient.updateSink(streamSink.genSinkRequest());
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update sink=%s
failed with err=%s", streamSink,
updateState.getValue()));
@@ -409,7 +434,7 @@ public class InlongStreamImpl implements InlongStream {
StreamSink streamSink = sinkEntry.getValue();
streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
streamSink.setInlongStreamId(streamInfo.getInlongStreamId());
- managerClient.createSink(streamSink.genSinkRequest());
+ sinkClient.createSink(streamSink.genSinkRequest());
}
}
@@ -422,7 +447,7 @@ public class InlongStreamImpl implements InlongStream {
.filter(streamSink -> streamSink.getId().equals(sinkId))
.findAny()
// Try to get from db, if it doesn't exist in cache
- .orElseGet(() -> managerClient.getSinkInfo(sinkId));
+ .orElseGet(() -> sinkClient.getSinkInfo(sinkId));
}
@Override
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
index 0873702be..dc9f05314 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
@@ -24,7 +24,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.LowLevelInlongClient;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
@@ -71,14 +73,14 @@ public class LowLevelInlongClientImpl implements
LowLevelInlongClient {
}
@Override
- public Integer saveCluster(ClusterRequest request) throws Exception {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(this.configuration);
- return managerClient.saveCluster(request);
+ public Integer saveCluster(ClusterRequest request) {
+ InlongClusterClient clusterClient =
ClientUtils.getClientFactory(configuration).getClusterClient();
+ return clusterClient.saveCluster(request);
}
@Override
- public PageInfo<InlongGroupListResponse> listGroup(InlongGroupPageRequest
request) throws Exception {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(this.configuration);
- return managerClient.listGroups(request);
+ public PageInfo<InlongGroupListResponse> listGroup(InlongGroupPageRequest
request) {
+ InlongGroupClient groupClient =
ClientUtils.getClientFactory(configuration).getGroupClient();
+ return groupClient.listGroups(request);
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
deleted file mode 100644
index c9bd0787a..000000000
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.client.api.inner;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.github.pagehelper.PageInfo;
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.service.AuthInterceptor;
-import org.apache.inlong.manager.client.api.service.InlongClusterApi;
-import org.apache.inlong.manager.client.api.service.InlongGroupApi;
-import org.apache.inlong.manager.client.api.service.InlongStreamApi;
-import org.apache.inlong.manager.client.api.service.StreamSinkApi;
-import org.apache.inlong.manager.client.api.service.StreamSourceApi;
-import org.apache.inlong.manager.client.api.service.StreamTransformApi;
-import org.apache.inlong.manager.client.api.service.WorkflowApi;
-import org.apache.inlong.manager.common.auth.Authentication;
-import org.apache.inlong.manager.common.auth.DefaultAuthentication;
-import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import
org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
-import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
-import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
-import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import
org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.springframework.boot.configurationprocessor.json.JSONObject;
-import retrofit2.Call;
-import retrofit2.Retrofit;
-import retrofit2.converter.jackson.JacksonConverterFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
-import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
-
-/**
- * InnerInlongManagerClient is used to invoke http api of inlong manager.
- */
-@Slf4j
-public class InnerInlongManagerClient {
-
- private static final String REQUEST_FAILED_MSG = "Request to Inlong %s
failed: %s";
-
- protected final String host;
- protected final int port;
-
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- private final InlongClusterApi inlongClusterApi;
- private final InlongStreamApi inlongStreamApi;
- private final InlongGroupApi inlongGroupApi;
- private final StreamSourceApi streamSourceApi;
- private final StreamTransformApi streamTransformApi;
- private final StreamSinkApi streamSinkApi;
- private final WorkflowApi workflowApi;
-
- public InnerInlongManagerClient(ClientConfiguration configuration) {
- this.host = configuration.getBindHost();
- this.port = configuration.getBindPort();
-
- Authentication authentication = configuration.getAuthentication();
- Preconditions.checkNotNull(authentication, "inlong should be
authenticated");
- Preconditions.checkTrue(authentication instanceof
DefaultAuthentication,
- "inlong only support default authentication");
- DefaultAuthentication defaultAuthentication = (DefaultAuthentication)
authentication;
-
- OkHttpClient okHttpClient = new OkHttpClient.Builder()
- .addInterceptor(
- new
AuthInterceptor(defaultAuthentication.getUsername(),
defaultAuthentication.getPassword()))
- .connectTimeout(configuration.getConnectTimeout(),
configuration.getTimeUnit())
- .readTimeout(configuration.getReadTimeout(),
configuration.getTimeUnit())
- .writeTimeout(configuration.getWriteTimeout(),
configuration.getTimeUnit())
- .retryOnConnectionFailure(true)
- .build();
-
- Retrofit retrofit = new Retrofit.Builder()
- .baseUrl("http://" + host + ":" + port +
"/api/inlong/manager/")
-
.addConverterFactory(JacksonConverterFactory.create(JsonUtils.OBJECT_MAPPER))
- .client(okHttpClient)
- .build();
-
- inlongStreamApi = retrofit.create(InlongStreamApi.class);
- inlongGroupApi = retrofit.create(InlongGroupApi.class);
- streamSinkApi = retrofit.create(StreamSinkApi.class);
- streamSourceApi = retrofit.create(StreamSourceApi.class);
- streamTransformApi = retrofit.create(StreamTransformApi.class);
- workflowApi = retrofit.create(WorkflowApi.class);
- inlongClusterApi = retrofit.create(InlongClusterApi.class);
- }
-
- /**
- * Save component cluster for Inlong
- *
- * @param request cluster create request
- * @return clusterIndex
- */
- public Integer saveCluster(ClusterRequest request) {
- Preconditions.checkNotEmpty(request.getName(), "cluster name should
not be empty");
- Preconditions.checkNotEmpty(request.getType(), "cluster type should
not be empty");
- Preconditions.checkNotEmpty(request.getClusterTags(), "cluster tags
should not be empty");
- Response<Integer> clusterIndexResponse =
executeHttpCall(inlongClusterApi.save(request));
- assertRespSuccess(clusterIndexResponse);
- return clusterIndexResponse.getData();
- }
-
- /**
- * Get inlong group by the given inlong group id.
- *
- * @param inlongGroupId the given inlong group id
- * @return inlong group info if exists, null will be returned if not exits
- */
- public InlongGroupInfo getGroupIfExists(String inlongGroupId) {
- if (this.isGroupExists(inlongGroupId)) {
- return getGroupInfo(inlongGroupId);
- }
- return null;
- }
-
- /**
- * Check whether a group exists based on the group ID.
- */
- public Boolean isGroupExists(String inlongGroupId) {
- Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not
be empty");
-
- Response<Boolean> response =
executeHttpCall(inlongGroupApi.isGroupExists(inlongGroupId));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Get info of group.
- */
- @SneakyThrows
- public InlongGroupInfo getGroupInfo(String inlongGroupId) {
- Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not
be empty");
-
- Response<Object> responseBody =
executeHttpCall(inlongGroupApi.getGroupInfo(inlongGroupId));
- if (responseBody.isSuccess()) {
- JSONObject groupInfoJson = JsonUtils.parseObject(
-
JsonUtils.toJsonString(JsonUtils.toJsonString(responseBody.getData())),
- JSONObject.class);
- if (groupInfoJson.has(MQ_FIELD_OLD) &&
!groupInfoJson.has(MQ_FIELD)) {
- groupInfoJson.put(MQ_FIELD, groupInfoJson.get(MQ_FIELD_OLD));
- }
- return JsonUtils.parseObject(groupInfoJson.toString(),
InlongGroupInfo.class);
- }
-
- if (responseBody.getErrMsg().contains("not exist")) {
- return null;
- } else {
- throw new RuntimeException(responseBody.getErrMsg());
- }
- }
-
- /**
- * Get inlong group list.
- */
- public PageInfo<InlongGroupListResponse> listGroups(String keyword, int
status, int pageNum, int pageSize) {
- InlongGroupPageRequest request = InlongGroupPageRequest.builder()
- .keyword(keyword)
- .status(status)
- .build();
- request.setPageNum(pageNum <= 0 ? 1 : pageNum);
- request.setPageSize(pageSize);
-
- Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
executeHttpCall(
- inlongGroupApi.listGroups(request));
-
- if (pageInfoResponse.isSuccess()) {
- return pageInfoResponse.getData();
- }
- if (pageInfoResponse.getErrMsg().contains("not exist")) {
- return null;
- } else {
- throw new RuntimeException(pageInfoResponse.getErrMsg());
- }
- }
-
- /**
- * List inlong group by the page request
- *
- * @param pageRequest The pageRequest
- * @return Response encapsulate of inlong group list
- */
- public PageInfo<InlongGroupListResponse> listGroups(InlongGroupPageRequest
pageRequest) {
- Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
executeHttpCall(
- inlongGroupApi.listGroups(pageRequest));
- assertRespSuccess(pageInfoResponse);
- return pageInfoResponse.getData();
- }
-
- /**
- * Create an inlong group
- */
- public String createGroup(InlongGroupRequest groupInfo) {
- Response<String> response =
executeHttpCall(inlongGroupApi.createGroup(groupInfo));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Update inlong group info
- *
- * @return groupId && errMsg
- */
- public Pair<String, String> updateGroup(InlongGroupRequest groupRequest) {
- Response<String> response =
executeHttpCall(inlongGroupApi.updateGroup(groupRequest));
- return Pair.of(response.getData(), response.getErrMsg());
- }
-
- /**
- * Reset inlong group info
- */
- public boolean resetGroup(InlongGroupResetRequest resetRequest) {
- Response<Boolean> response =
executeHttpCall(inlongGroupApi.resetGroup(resetRequest));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Create an inlong stream.
- */
- public Integer createStreamInfo(InlongStreamInfo streamInfo) {
- Response<Integer> response =
executeHttpCall(inlongStreamApi.createStream(streamInfo));
- assertRespSuccess(response);
- return response.getData();
- }
-
- public Boolean isStreamExists(InlongStreamInfo streamInfo) {
- final String groupId = streamInfo.getInlongGroupId();
- final String streamId = streamInfo.getInlongStreamId();
- Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
- Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
-
- Response<Boolean> response =
executeHttpCall(inlongStreamApi.isStreamExists(groupId, streamId));
- assertRespSuccess(response);
- return response.getData();
- }
-
- public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo)
{
- Response<Boolean> resp =
executeHttpCall(inlongStreamApi.updateStream(streamInfo));
-
- if (resp.getData() != null) {
- return Pair.of(resp.getData(), resp.getErrMsg());
- } else {
- return Pair.of(false, resp.getErrMsg());
- }
- }
-
- /**
- * Get inlong stream by the given groupId and streamId.
- */
- public InlongStreamInfo getStreamInfo(String groupId, String streamId) {
- Response<InlongStreamInfo> response =
executeHttpCall(inlongStreamApi.getStream(groupId, streamId));
-
- if (response.isSuccess()) {
- return response.getData();
- }
- if (response.getErrMsg().contains("not exist")) {
- return null;
- } else {
- throw new RuntimeException(response.getErrMsg());
- }
- }
-
- /**
- * Get inlong stream info.
- */
- public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
- InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
- pageRequest.setInlongGroupId(inlongGroupId);
-
- Response<PageInfo<InlongStreamInfo>> response =
executeHttpCall(inlongStreamApi.listStream(pageRequest));
- assertRespSuccess(response);
- return response.getData().getList();
- }
-
- /**
- * Create an inlong stream source.
- */
- public Integer createSource(SourceRequest request) {
- Response<Integer> response =
executeHttpCall(streamSourceApi.createSource(request));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * List stream sources by the given groupId and streamId.
- */
- public List<StreamSource> listSources(String groupId, String streamId) {
- return listSources(groupId, streamId, null);
- }
-
- /**
- * List stream sources by the specified source type.
- */
- public List<StreamSource> listSources(String groupId, String streamId,
String sourceType) {
- Response<PageInfo<StreamSource>> response = executeHttpCall(
- streamSourceApi.listSources(groupId, streamId, sourceType));
- assertRespSuccess(response);
- return response.getData().getList();
- }
-
- /**
- * Update the stream source info.
- */
- public Pair<Boolean, String> updateSource(SourceRequest request) {
- Response<Boolean> response =
executeHttpCall(streamSourceApi.updateSource(request));
- if (response.getData() != null) {
- return Pair.of(response.getData(), response.getErrMsg());
- } else {
- return Pair.of(false, response.getErrMsg());
- }
- }
-
- /**
- * Delete the stream source info by id.
- */
- public boolean deleteSource(int id) {
- Preconditions.checkTrue(id > 0, "sourceId is illegal");
- Response<Boolean> response =
executeHttpCall(streamSourceApi.deleteSource(id));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Create a conversion function info.
- */
- public Integer createTransform(TransformRequest transformRequest) {
- Response<Integer> response =
executeHttpCall(streamTransformApi.createTransform(transformRequest));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Get all conversion function info.
- */
- public List<TransformResponse> listTransform(String groupId, String
streamId) {
- Response<List<TransformResponse>> response = executeHttpCall(
- streamTransformApi.listTransform(groupId, streamId));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Update conversion function info.
- */
- public Pair<Boolean, String> updateTransform(TransformRequest
transformRequest) {
- Response<Boolean> response =
executeHttpCall(streamTransformApi.updateTransform(transformRequest));
-
- if (response.getData() != null) {
- return Pair.of(response.getData(), response.getErrMsg());
- } else {
- return Pair.of(false, response.getErrMsg());
- }
- }
-
- /**
- * Delete conversion function info.
- */
- public boolean deleteTransform(TransformRequest transformRequest) {
- Preconditions.checkNotEmpty(transformRequest.getInlongGroupId(),
"inlongGroupId should not be null");
- Preconditions.checkNotEmpty(transformRequest.getInlongStreamId(),
"inlongStreamId should not be null");
- Preconditions.checkNotEmpty(transformRequest.getTransformName(),
"transformName should not be null");
-
- Response<Boolean> response = executeHttpCall(
-
streamTransformApi.deleteTransform(transformRequest.getInlongGroupId(),
- transformRequest.getInlongStreamId(),
transformRequest.getTransformName()));
- assertRespSuccess(response);
- return response.getData();
- }
-
- public Integer createSink(SinkRequest sinkRequest) {
- Response<Integer> response =
executeHttpCall(streamSinkApi.createSink(sinkRequest));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Delete stream sink info by ID.
- */
- public boolean deleteSink(int id) {
- Preconditions.checkTrue(id > 0, "sinkId is illegal");
- Response<Boolean> response =
executeHttpCall(streamSinkApi.deleteSink(id));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * List stream sinks by the given groupId and streamId.
- */
- public List<StreamSink> listSinks(String groupId, String streamId) {
- return listSinks(groupId, streamId, null);
- }
-
- /**
- * List stream sinks by the specified sink type.
- */
- public List<StreamSink> listSinks(String groupId, String streamId, String
sinkType) {
- Response<PageInfo<StreamSink>> response = executeHttpCall(
- streamSinkApi.listSinks(groupId, streamId, sinkType));
- assertRespSuccess(response);
- return response.getData().getList();
- }
-
- /**
- * Get detail information of data sink.
- */
- public StreamSink getSinkInfo(Integer sinkId) {
- Response<StreamSink> response =
executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
- assertRespSuccess(response);
- return response.getData();
- }
-
- /**
- * Update the stream sink info.
- */
- public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
- Response<Boolean> responseBody =
executeHttpCall(streamSinkApi.updateSink(sinkRequest));
- assertRespSuccess(responseBody);
-
- if (responseBody.getData() != null) {
- return Pair.of(responseBody.getData(), responseBody.getErrMsg());
- } else {
- return Pair.of(false, responseBody.getErrMsg());
- }
- }
-
- public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
- Response<WorkflowResult> responseBody = executeHttpCall(
- inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
- assertRespSuccess(responseBody);
- return responseBody.getData();
- }
-
- public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm
newGroupProcessForm) {
- ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
- workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
- workflowTaskOperation.put("remark", "approved by system");
-
- ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
- inlongGroupApproveForm.putPOJO("groupApproveInfo",
newGroupProcessForm.getGroupInfo());
- inlongGroupApproveForm.putPOJO("streamApproveInfoList",
newGroupProcessForm.getStreamInfoList());
- inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
- workflowTaskOperation.set("form", inlongGroupApproveForm);
-
- log.info("startInlongGroup workflowTaskOperation: {}",
inlongGroupApproveForm);
-
- Map<String, Object> requestMap =
JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
- new TypeReference<Map<String, Object>>() {
- });
- Response<WorkflowResult> response =
executeHttpCall(workflowApi.startInlongGroup(taskId, requestMap));
- assertRespSuccess(response);
-
- return response.getData();
- }
-
- public boolean operateInlongGroup(String groupId, SimpleGroupStatus
status) {
- return operateInlongGroup(groupId, status, false);
- }
-
- public boolean operateInlongGroup(String groupId, SimpleGroupStatus
status, boolean async) {
- Call<Response<String>> responseCall;
- if (status == SimpleGroupStatus.STOPPED) {
- if (async) {
- responseCall = inlongGroupApi.suspendProcessAsync(groupId);
- } else {
- responseCall = inlongGroupApi.suspendProcess(groupId);
- }
- } else if (status == SimpleGroupStatus.STARTED) {
- if (async) {
- responseCall = inlongGroupApi.restartProcessAsync(groupId);
- } else {
- responseCall = inlongGroupApi.restartProcess(groupId);
- }
- } else {
- throw new IllegalArgumentException(String.format("Unsupported
inlong group status: %s", status));
- }
-
- Response<String> responseBody = executeHttpCall(responseCall);
-
- String errMsg = responseBody.getErrMsg();
- return responseBody.isSuccess()
- || errMsg == null
- || !errMsg.contains("not allowed");
- }
-
- public boolean deleteInlongGroup(String groupId) {
- return deleteInlongGroup(groupId, false);
- }
-
- public boolean deleteInlongGroup(String groupId, boolean async) {
- if (async) {
- Response<String> response =
executeHttpCall(inlongGroupApi.deleteGroupAsync(groupId));
- assertRespSuccess(response);
- return groupId.equals(response.getData());
- } else {
- Response<Boolean> response =
executeHttpCall(inlongGroupApi.deleteGroup(groupId));
- assertRespSuccess(response);
- return response.getData();
- }
- }
-
- /**
- * get inlong group error messages
- */
- public List<EventLogView> getInlongGroupError(String inlongGroupId) {
- Response<PageInfo<EventLogView>> response =
executeHttpCall(workflowApi.getInlongGroupError(inlongGroupId, -1));
- assertRespSuccess(response);
- return response.getData().getList();
- }
-
- /**
- * get inlong group error messages
- */
- public List<InlongStreamConfigLogListResponse> getStreamLogs(String
inlongGroupId, String inlongStreamId) {
- Response<PageInfo<InlongStreamConfigLogListResponse>> response =
executeHttpCall(
- inlongStreamApi.getStreamLogs(inlongGroupId, inlongStreamId));
- assertRespSuccess(response);
- return response.getData().getList();
- }
-
- private <T> T executeHttpCall(Call<T> call) {
- Request request = call.request();
- String url = request.url().encodedPath();
- try {
- retrofit2.Response<T> response = call.execute();
- Preconditions.checkTrue(response.isSuccessful(),
- String.format(REQUEST_FAILED_MSG, url,
response.message()));
- return response.body();
- } catch (IOException e) {
- log.error(String.format(REQUEST_FAILED_MSG, url, e.getMessage()),
e);
- throw new RuntimeException(String.format(REQUEST_FAILED_MSG, url,
e.getMessage()), e);
- }
- }
-
- private void assertRespSuccess(Response<?> response) {
- Preconditions.checkTrue(response.isSuccess(),
String.format(REQUEST_FAILED_MSG, response.getErrMsg(), null));
- }
-
-}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
new file mode 100644
index 000000000..9ac932fe1
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import lombok.Getter;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+
+/**
+ * Factory for {@link org.apache.inlong.manager.client.api.inner.client}.
+ */
+@Getter
+public class ClientFactory {
+
+ private final InlongGroupClient groupClient;
+
+ private final InlongStreamClient streamClient;
+
+ private final StreamSinkClient sinkClient;
+
+ private final StreamSourceClient sourceClient;
+
+ private final InlongClusterClient clusterClient;
+
+ private final StreamTransformClient transformClient;
+
+ private final WorkflowClient workflowClient;
+
+ public ClientFactory(ClientConfiguration configuration) {
+ groupClient = new InlongGroupClient(configuration);
+ streamClient = new InlongStreamClient(configuration);
+ sourceClient = new StreamSourceClient(configuration);
+ sinkClient = new StreamSinkClient(configuration);
+ clusterClient = new InlongClusterClient(configuration);
+ transformClient = new StreamTransformClient(configuration);
+ workflowClient = new WorkflowClient(configuration);
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
new file mode 100644
index 000000000..1e5e976be
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongClusterApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+/**
+ * Client for {@link InlongClusterApi}.
+ */
+public class InlongClusterClient {
+
+ private final InlongClusterApi inlongClusterApi;
+
+ public InlongClusterClient(ClientConfiguration configuration) {
+ inlongClusterApi =
ClientUtils.createRetrofit(configuration).create(InlongClusterApi.class);
+ }
+
+ /**
+ * Save component cluster for Inlong
+ *
+ * @param request cluster create request
+ * @return clusterIndex
+ */
+ public Integer saveCluster(ClusterRequest request) {
+ Preconditions.checkNotEmpty(request.getName(), "cluster name should
not be empty");
+ Preconditions.checkNotEmpty(request.getType(), "cluster type should
not be empty");
+ Preconditions.checkNotEmpty(request.getClusterTags(), "cluster tags
should not be empty");
+ Response<Integer> clusterIndexResponse =
ClientUtils.executeHttpCall(inlongClusterApi.save(request));
+ ClientUtils.assertRespSuccess(clusterIndexResponse);
+ return clusterIndexResponse.getData();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
new file mode 100644
index 000000000..0aff6ef4b
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.client.api.service.InlongGroupApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.springframework.boot.configurationprocessor.json.JSONObject;
+import retrofit2.Call;
+
+import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
+import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
+
+/**
+ * Client for {@link InlongGroupApi}.
+ */
+public class InlongGroupClient {
+
+ private final InlongGroupApi inlongGroupApi;
+
+ public InlongGroupClient(ClientConfiguration configuration) {
+ inlongGroupApi =
ClientUtils.createRetrofit(configuration).create(InlongGroupApi.class);
+ }
+
+ /**
+ * Check whether a group exists based on the group ID.
+ */
+ public Boolean isGroupExists(String inlongGroupId) {
+ Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not
be empty");
+
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongGroupApi.isGroupExists(inlongGroupId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Get inlong group by the given inlong group id.
+ *
+ * @param inlongGroupId the given inlong group id
+ * @return inlong group info if exists, null will be returned if not exits
+ */
+ public InlongGroupInfo getGroupIfExists(String inlongGroupId) {
+ if (this.isGroupExists(inlongGroupId)) {
+ return getGroupInfo(inlongGroupId);
+ }
+ return null;
+ }
+
+ /**
+ * Get info of group.
+ */
+ @SneakyThrows
+ public InlongGroupInfo getGroupInfo(String inlongGroupId) {
+ Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not
be empty");
+
+ Response<Object> responseBody =
ClientUtils.executeHttpCall(inlongGroupApi.getGroupInfo(inlongGroupId));
+ if (responseBody.isSuccess()) {
+ JSONObject groupInfoJson = JsonUtils.parseObject(
+
JsonUtils.toJsonString(JsonUtils.toJsonString(responseBody.getData())),
+ JSONObject.class);
+ if (groupInfoJson.has(MQ_FIELD_OLD) &&
!groupInfoJson.has(MQ_FIELD)) {
+ groupInfoJson.put(MQ_FIELD, groupInfoJson.get(MQ_FIELD_OLD));
+ }
+ return JsonUtils.parseObject(groupInfoJson.toString(),
InlongGroupInfo.class);
+ }
+
+ if (responseBody.getErrMsg().contains("not exist")) {
+ return null;
+ } else {
+ throw new RuntimeException(responseBody.getErrMsg());
+ }
+ }
+
+ /**
+ * Get inlong group list.
+ */
+ public PageInfo<InlongGroupListResponse> listGroups(String keyword, int
status, int pageNum, int pageSize) {
+ InlongGroupPageRequest request = InlongGroupPageRequest.builder()
+ .keyword(keyword)
+ .status(status)
+ .build();
+ request.setPageNum(pageNum <= 0 ? 1 : pageNum);
+ request.setPageSize(pageSize);
+
+ Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
ClientUtils.executeHttpCall(
+ inlongGroupApi.listGroups(request));
+
+ if (pageInfoResponse.isSuccess()) {
+ return pageInfoResponse.getData();
+ }
+ if (pageInfoResponse.getErrMsg().contains("not exist")) {
+ return null;
+ } else {
+ throw new RuntimeException(pageInfoResponse.getErrMsg());
+ }
+ }
+
+ /**
+ * List inlong group by the page request
+ *
+ * @param pageRequest The pageRequest
+ * @return Response encapsulate of inlong group list
+ */
+ public PageInfo<InlongGroupListResponse> listGroups(InlongGroupPageRequest
pageRequest) {
+ Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
ClientUtils.executeHttpCall(
+ inlongGroupApi.listGroups(pageRequest));
+ ClientUtils.assertRespSuccess(pageInfoResponse);
+ return pageInfoResponse.getData();
+ }
+
+ /**
+ * Create an inlong group
+ */
+ public String createGroup(InlongGroupRequest groupInfo) {
+ Response<String> response =
ClientUtils.executeHttpCall(inlongGroupApi.createGroup(groupInfo));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Update inlong group info
+ *
+ * @return groupId && errMsg
+ */
+ public Pair<String, String> updateGroup(InlongGroupRequest groupRequest) {
+ Response<String> response =
ClientUtils.executeHttpCall(inlongGroupApi.updateGroup(groupRequest));
+ return Pair.of(response.getData(), response.getErrMsg());
+ }
+
+ /**
+ * Reset inlong group info
+ */
+ public boolean resetGroup(InlongGroupResetRequest resetRequest) {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongGroupApi.resetGroup(resetRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
+ Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
+ inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
+ ClientUtils.assertRespSuccess(responseBody);
+ return responseBody.getData();
+ }
+
+ public boolean operateInlongGroup(String groupId, SimpleGroupStatus
status) {
+ return operateInlongGroup(groupId, status, false);
+ }
+
+ public boolean operateInlongGroup(String groupId, SimpleGroupStatus
status, boolean async) {
+ Call<Response<String>> responseCall;
+ if (status == SimpleGroupStatus.STOPPED) {
+ if (async) {
+ responseCall = inlongGroupApi.suspendProcessAsync(groupId);
+ } else {
+ responseCall = inlongGroupApi.suspendProcess(groupId);
+ }
+ } else if (status == SimpleGroupStatus.STARTED) {
+ if (async) {
+ responseCall = inlongGroupApi.restartProcessAsync(groupId);
+ } else {
+ responseCall = inlongGroupApi.restartProcess(groupId);
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Unsupported
inlong group status: %s", status));
+ }
+
+ Response<String> responseBody =
ClientUtils.executeHttpCall(responseCall);
+
+ String errMsg = responseBody.getErrMsg();
+ return responseBody.isSuccess()
+ || errMsg == null
+ || !errMsg.contains("not allowed");
+ }
+
+ public boolean deleteInlongGroup(String groupId) {
+ return deleteInlongGroup(groupId, false);
+ }
+
+ public boolean deleteInlongGroup(String groupId, boolean async) {
+ if (async) {
+ Response<String> response =
ClientUtils.executeHttpCall(inlongGroupApi.deleteGroupAsync(groupId));
+ ClientUtils.assertRespSuccess(response);
+ return groupId.equals(response.getData());
+ } else {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongGroupApi.deleteGroup(groupId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
new file mode 100644
index 000000000..0b661df85
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongStreamApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import
org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link InlongStreamApi}.
+ */
+public class InlongStreamClient {
+
+ private final InlongStreamApi inlongStreamApi;
+
+ public InlongStreamClient(ClientConfiguration configuration) {
+ inlongStreamApi =
ClientUtils.createRetrofit(configuration).create(InlongStreamApi.class);
+ }
+
+ /**
+ * Create an inlong stream.
+ */
+ public Integer createStreamInfo(InlongStreamInfo streamInfo) {
+ Response<Integer> response =
ClientUtils.executeHttpCall(inlongStreamApi.createStream(streamInfo));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public Boolean isStreamExists(InlongStreamInfo streamInfo) {
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be
empty");
+ Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be
empty");
+
+ Response<Boolean> response =
ClientUtils.executeHttpCall(inlongStreamApi.isStreamExists(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo)
{
+ Response<Boolean> resp =
ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
+
+ if (resp.getData() != null) {
+ return Pair.of(resp.getData(), resp.getErrMsg());
+ } else {
+ return Pair.of(false, resp.getErrMsg());
+ }
+ }
+
+ /**
+ * Get inlong stream by the given groupId and streamId.
+ */
+ public InlongStreamInfo getStreamInfo(String groupId, String streamId) {
+ Response<InlongStreamInfo> response =
ClientUtils.executeHttpCall(inlongStreamApi.getStream(groupId, streamId));
+
+ if (response.isSuccess()) {
+ return response.getData();
+ }
+ if (response.getErrMsg().contains("not exist")) {
+ return null;
+ } else {
+ throw new RuntimeException(response.getErrMsg());
+ }
+ }
+
+ /**
+ * Get inlong stream info.
+ */
+ public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
+ InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
+ pageRequest.setInlongGroupId(inlongGroupId);
+
+ Response<PageInfo<InlongStreamInfo>> response =
ClientUtils.executeHttpCall(
+ inlongStreamApi.listStream(pageRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData().getList();
+ }
+
+ /**
+ * get inlong group error messages
+ */
+ public List<InlongStreamConfigLogListResponse> getStreamLogs(String
inlongGroupId, String inlongStreamId) {
+ Response<PageInfo<InlongStreamConfigLogListResponse>> response =
ClientUtils.executeHttpCall(
+ inlongStreamApi.getStreamLogs(inlongGroupId, inlongStreamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData().getList();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
new file mode 100644
index 000000000..96074baa9
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamSinkApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamSinkApi}.
+ */
+public class StreamSinkClient {
+
+ private final StreamSinkApi streamSinkApi;
+
+ public StreamSinkClient(ClientConfiguration configuration) {
+ streamSinkApi =
ClientUtils.createRetrofit(configuration).create(StreamSinkApi.class);
+ }
+
+ public Integer createSink(SinkRequest sinkRequest) {
+ Response<Integer> response =
ClientUtils.executeHttpCall(streamSinkApi.createSink(sinkRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Delete stream sink info by ID.
+ */
+ public boolean deleteSink(int id) {
+ Preconditions.checkTrue(id > 0, "sinkId is illegal");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(streamSinkApi.deleteSink(id));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * List stream sinks by the given groupId and streamId.
+ */
+ public List<StreamSink> listSinks(String groupId, String streamId) {
+ return listSinks(groupId, streamId, null);
+ }
+
+ /**
+ * List stream sinks by the specified sink type.
+ */
+ public List<StreamSink> listSinks(String groupId, String streamId, String
sinkType) {
+ Response<PageInfo<StreamSink>> response = ClientUtils.executeHttpCall(
+ streamSinkApi.listSinks(groupId, streamId, sinkType));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData().getList();
+ }
+
+ /**
+ * Update the stream sink info.
+ */
+ public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
+ Response<Boolean> responseBody =
ClientUtils.executeHttpCall(streamSinkApi.updateSink(sinkRequest));
+ ClientUtils.assertRespSuccess(responseBody);
+
+ if (responseBody.getData() != null) {
+ return Pair.of(responseBody.getData(), responseBody.getErrMsg());
+ } else {
+ return Pair.of(false, responseBody.getErrMsg());
+ }
+ }
+
+ /**
+ * Get detail information of data sink.
+ */
+ public StreamSink getSinkInfo(Integer sinkId) {
+ Response<StreamSink> response =
ClientUtils.executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
new file mode 100644
index 000000000..0cddd86a7
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamSourceApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamSourceApi}.
+ */
+public class StreamSourceClient {
+
+ private final StreamSourceApi streamSourceApi;
+
+ public StreamSourceClient(ClientConfiguration configuration) {
+ streamSourceApi =
ClientUtils.createRetrofit(configuration).create(StreamSourceApi.class);
+ }
+
+ /**
+ * Create an inlong stream source.
+ */
+ public Integer createSource(SourceRequest request) {
+ Response<Integer> response =
ClientUtils.executeHttpCall(streamSourceApi.createSource(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * List stream sources by the given groupId and streamId.
+ */
+ public List<StreamSource> listSources(String groupId, String streamId) {
+ return listSources(groupId, streamId, null);
+ }
+
+ /**
+ * List stream sources by the specified source type.
+ */
+ public List<StreamSource> listSources(String groupId, String streamId,
String sourceType) {
+ Response<PageInfo<StreamSource>> response =
ClientUtils.executeHttpCall(
+ streamSourceApi.listSources(groupId, streamId, sourceType));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData().getList();
+ }
+
+ /**
+ * Update the stream source info.
+ */
+ public Pair<Boolean, String> updateSource(SourceRequest request) {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(streamSourceApi.updateSource(request));
+ if (response.getData() != null) {
+ return Pair.of(response.getData(), response.getErrMsg());
+ } else {
+ return Pair.of(false, response.getErrMsg());
+ }
+ }
+
+ /**
+ * Delete data source information by id.
+ */
+ public boolean deleteSource(int id) {
+ Preconditions.checkTrue(id > 0, "sourceId is illegal");
+ Response<Boolean> response =
ClientUtils.executeHttpCall(streamSourceApi.deleteSource(id));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
new file mode 100644
index 000000000..a2bd8fba9
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamTransformApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamTransformApi}.
+ */
+public class StreamTransformClient {
+
+ private final StreamTransformApi streamTransformApi;
+
+ public StreamTransformClient(ClientConfiguration configuration) {
+ streamTransformApi =
ClientUtils.createRetrofit(configuration).create(StreamTransformApi.class);
+ }
+
+ /**
+ * Create a conversion function info.
+ */
+ public Integer createTransform(TransformRequest transformRequest) {
+ Response<Integer> response =
ClientUtils.executeHttpCall(streamTransformApi.createTransform(transformRequest));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Get all conversion function info.
+ */
+ public List<TransformResponse> listTransform(String groupId, String
streamId) {
+ Response<List<TransformResponse>> response =
ClientUtils.executeHttpCall(
+ streamTransformApi.listTransform(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ /**
+ * Update conversion function info.
+ */
+ public Pair<Boolean, String> updateTransform(TransformRequest
transformRequest) {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(streamTransformApi.updateTransform(transformRequest));
+
+ if (response.getData() != null) {
+ return Pair.of(response.getData(), response.getErrMsg());
+ } else {
+ return Pair.of(false, response.getErrMsg());
+ }
+ }
+
+ /**
+ * Delete conversion function info.
+ */
+ public boolean deleteTransform(TransformRequest transformRequest) {
+ Preconditions.checkNotEmpty(transformRequest.getInlongGroupId(),
"inlongGroupId should not be null");
+ Preconditions.checkNotEmpty(transformRequest.getInlongStreamId(),
"inlongStreamId should not be null");
+ Preconditions.checkNotEmpty(transformRequest.getTransformName(),
"transformName should not be null");
+
+ Response<Boolean> response = ClientUtils.executeHttpCall(
+
streamTransformApi.deleteTransform(transformRequest.getInlongGroupId(),
+ transformRequest.getInlongStreamId(),
transformRequest.getTransformName()));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
new file mode 100644
index 000000000..82994129f
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.WorkflowApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import
org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Client for {@link WorkflowApi}.
+ */
+@Slf4j
+public class WorkflowClient {
+
+ private final WorkflowApi workflowApi;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public WorkflowClient(ClientConfiguration configuration) {
+ workflowApi =
ClientUtils.createRetrofit(configuration).create(WorkflowApi.class);
+ }
+
+ public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm
newGroupProcessForm) {
+ ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
+ workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
+ workflowTaskOperation.put("remark", "approved by system");
+
+ ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
+ inlongGroupApproveForm.putPOJO("groupApproveInfo",
newGroupProcessForm.getGroupInfo());
+ inlongGroupApproveForm.putPOJO("streamApproveInfoList",
newGroupProcessForm.getStreamInfoList());
+ inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
+ workflowTaskOperation.set("form", inlongGroupApproveForm);
+
+ log.info("startInlongGroup workflowTaskOperation: {}",
inlongGroupApproveForm);
+
+ Map<String, Object> requestMap =
JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
+ new TypeReference<Map<String, Object>>() {
+ });
+ Response<WorkflowResult> response = ClientUtils.executeHttpCall(
+ workflowApi.startInlongGroup(taskId, requestMap));
+ ClientUtils.assertRespSuccess(response);
+
+ return response.getData();
+ }
+
+ /**
+ * get inlong group error messages
+ */
+ public List<EventLogView> getInlongGroupError(String inlongGroupId) {
+ Response<PageInfo<EventLogView>> response =
ClientUtils.executeHttpCall(
+ workflowApi.getInlongGroupError(inlongGroupId, -1));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData().getList();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
new file mode 100644
index 000000000..6aaed4be2
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.service.AuthInterceptor;
+import org.apache.inlong.manager.common.auth.Authentication;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import retrofit2.Call;
+import retrofit2.Retrofit;
+import retrofit2.converter.jackson.JacksonConverterFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Utils for client
+ */
+@Slf4j
+@UtilityClass
+public class ClientUtils {
+
+ private static final String REQUEST_FAILED_MSG = "Request to Inlong %s
failed: %s";
+
+ private static ClientFactory clientFactory;
+
+ /**
+ * Get factory for {@link
org.apache.inlong.manager.client.api.inner.client}.
+ *
+ * @param configuration client configuration
+ * @return ClientFactory
+ */
+ public static ClientFactory getClientFactory(ClientConfiguration
configuration) {
+ return Optional.ofNullable(clientFactory).orElse(new
ClientFactory(configuration));
+ }
+
+ /**
+ * Get retrofit to instantiate Client API.
+ *
+ * @param configuration client configuration
+ * @return Retrofit
+ */
+ public static Retrofit createRetrofit(ClientConfiguration configuration) {
+ String host = configuration.getBindHost();
+ int port = configuration.getBindPort();
+
+ Authentication authentication = configuration.getAuthentication();
+ Preconditions.checkNotNull(authentication, "inlong should be
authenticated");
+ Preconditions.checkTrue(authentication instanceof
DefaultAuthentication,
+ "inlong only support default authentication");
+ DefaultAuthentication defaultAuthentication = (DefaultAuthentication)
authentication;
+
+ OkHttpClient okHttpClient = new OkHttpClient.Builder()
+ .addInterceptor(
+ new
AuthInterceptor(defaultAuthentication.getUsername(),
defaultAuthentication.getPassword()))
+ .connectTimeout(configuration.getConnectTimeout(),
configuration.getTimeUnit())
+ .readTimeout(configuration.getReadTimeout(),
configuration.getTimeUnit())
+ .writeTimeout(configuration.getWriteTimeout(),
configuration.getTimeUnit())
+ .retryOnConnectionFailure(true)
+ .build();
+
+ return new Retrofit.Builder()
+ .baseUrl("http://" + host + ":" + port +
"/api/inlong/manager/")
+
.addConverterFactory(JacksonConverterFactory.create(JsonUtils.OBJECT_MAPPER))
+ .client(okHttpClient)
+ .build();
+ }
+
+ /**
+ * Send http request.
+ *
+ * @param call http request
+ * @param <T> T
+ * @return T
+ */
+ public static <T> T executeHttpCall(Call<T> call) {
+ Request request = call.request();
+ String url = request.url().encodedPath();
+ try {
+ retrofit2.Response<T> response = call.execute();
+ Preconditions.checkTrue(response.isSuccessful(),
+ String.format(REQUEST_FAILED_MSG, url,
response.message()));
+ return response.body();
+ } catch (IOException e) {
+ log.error(String.format(REQUEST_FAILED_MSG, url, e.getMessage()),
e);
+ throw new RuntimeException(String.format(REQUEST_FAILED_MSG, url,
e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Assert if the response is successful.
+ *
+ * @param response response
+ */
+ public static void assertRespSuccess(Response<?> response) {
+ Preconditions.checkTrue(response.isSuccess(),
String.format(REQUEST_FAILED_MSG, response.getErrMsg(), null));
+ }
+}
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
similarity index 91%
rename from
inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
rename to
inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 7d8295d6c..de019484d 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -26,6 +26,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -73,14 +79,17 @@ import static
com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static
com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
/**
- * Unit test for {@link InnerInlongManagerClient}.
+ * Unit test for {@link ClientFactory}.
*/
@Slf4j
-class InnerInlongManagerClientTest {
+class ClientFactoryTest {
private static final int SERVICE_PORT = 8085;
private static WireMockServer wireMockServer;
- private static InnerInlongManagerClient innerInlongManagerClient;
+ private static InlongGroupClient groupClient;
+ private static InlongStreamClient streamClient;
+ private static StreamSinkClient sinkClient;
+ private static InlongClusterClient clusterClient;
@BeforeAll
static void setup() {
@@ -92,7 +101,12 @@ class InnerInlongManagerClientTest {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setAuthentication(new DefaultAuthentication("admin",
"inlong"));
InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl,
configuration);
- innerInlongManagerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(inlongClient.getConfiguration());
+ groupClient = clientFactory.getGroupClient();
+ streamClient = clientFactory.getStreamClient();
+ sinkClient = clientFactory.getSinkClient();
+ streamClient = clientFactory.getStreamClient();
+ clusterClient = clientFactory.getClusterClient();
}
@AfterAll
@@ -108,7 +122,7 @@ class InnerInlongManagerClientTest {
okJson(JsonUtils.toJsonString(Response.success(true)))
)
);
- Boolean groupExists = innerInlongManagerClient.isGroupExists("123");
+ Boolean groupExists = groupClient.isGroupExists("123");
Assertions.assertTrue(groupExists);
}
@@ -136,7 +150,7 @@ class InnerInlongManagerClientTest {
)
);
- InlongGroupInfo groupInfo = innerInlongManagerClient.getGroupInfo("1");
+ InlongGroupInfo groupInfo = groupClient.getGroupInfo("1");
Assertions.assertTrue(groupInfo instanceof InlongPulsarInfo);
Assertions.assertEquals(JsonUtils.toJsonString(inlongGroupResponse),
JsonUtils.toJsonString(groupInfo));
}
@@ -168,7 +182,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
JsonUtils.toJsonString(listResponse.getList()));
}
@@ -204,7 +218,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
JsonUtils.toJsonString(listResponse.getList()));
}
@@ -242,7 +256,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
JsonUtils.toJsonString(listResponse.getList()));
}
@@ -282,7 +296,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
JsonUtils.toJsonString(listResponse.getList()));
}
@@ -354,7 +368,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
JsonUtils.toJsonString(listResponse.getList()));
}
@@ -370,7 +384,7 @@ class InnerInlongManagerClientTest {
)
);
- PageInfo<InlongGroupListResponse> listResponse =
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+ PageInfo<InlongGroupListResponse> listResponse =
groupClient.listGroups("keyword", 1, 1, 10);
Assertions.assertNull(listResponse);
}
@@ -383,7 +397,7 @@ class InnerInlongManagerClientTest {
)
);
- String groupId = innerInlongManagerClient.createGroup(new
InlongPulsarRequest());
+ String groupId = groupClient.createGroup(new InlongPulsarRequest());
Assertions.assertEquals("1111", groupId);
}
@@ -396,7 +410,7 @@ class InnerInlongManagerClientTest {
)
);
- Pair<String, String> updateGroup =
innerInlongManagerClient.updateGroup(new InlongPulsarRequest());
+ Pair<String, String> updateGroup = groupClient.updateGroup(new
InlongPulsarRequest());
Assertions.assertEquals("1111", updateGroup.getKey());
Assertions.assertTrue(StringUtils.isBlank(updateGroup.getValue()));
}
@@ -410,7 +424,7 @@ class InnerInlongManagerClientTest {
)
);
- Integer groupId = innerInlongManagerClient.createStreamInfo(new
InlongStreamInfo());
+ Integer groupId = streamClient.createStreamInfo(new
InlongStreamInfo());
Assertions.assertEquals(11, groupId);
}
@@ -426,7 +440,7 @@ class InnerInlongManagerClientTest {
InlongStreamInfo streamInfo = new InlongStreamInfo();
streamInfo.setInlongGroupId("123");
streamInfo.setInlongStreamId("11");
- Boolean groupExists =
innerInlongManagerClient.isStreamExists(streamInfo);
+ Boolean groupExists = streamClient.isStreamExists(streamInfo);
Assertions.assertTrue(groupExists);
}
@@ -461,7 +475,7 @@ class InnerInlongManagerClientTest {
)
);
- InlongStreamInfo inlongStreamInfo =
innerInlongManagerClient.getStreamInfo("123", "11");
+ InlongStreamInfo inlongStreamInfo = streamClient.getStreamInfo("123",
"11");
Assertions.assertNotNull(inlongStreamInfo);
}
@@ -475,7 +489,7 @@ class InnerInlongManagerClientTest {
)
);
- InlongStreamInfo inlongStreamInfo =
innerInlongManagerClient.getStreamInfo("123", "11");
+ InlongStreamInfo inlongStreamInfo = streamClient.getStreamInfo("123",
"11");
Assertions.assertNull(inlongStreamInfo);
}
@@ -565,7 +579,7 @@ class InnerInlongManagerClientTest {
)
);
- List<InlongStreamInfo> streamInfos =
innerInlongManagerClient.listStreamInfo("11");
+ List<InlongStreamInfo> streamInfos = streamClient.listStreamInfo("11");
Assertions.assertEquals(JsonUtils.toJsonString(streamInfo),
JsonUtils.toJsonString(streamInfos.get(0)));
}
@@ -624,7 +638,7 @@ class InnerInlongManagerClientTest {
)
);
- List<StreamSink> sinks = innerInlongManagerClient.listSinks("11",
"11");
+ List<StreamSink> sinks = sinkClient.listSinks("11", "11");
Assertions.assertEquals(JsonUtils.toJsonString(sinkList),
JsonUtils.toJsonString(sinks));
}
@@ -640,7 +654,7 @@ class InnerInlongManagerClientTest {
);
RuntimeException exception =
Assertions.assertThrows(IllegalArgumentException.class,
- () -> innerInlongManagerClient.listSinks("", "11"));
+ () -> sinkClient.listSinks("", "11"));
Assertions.assertTrue(exception.getMessage().contains("groupId should
not empty"));
}
@@ -655,7 +669,7 @@ class InnerInlongManagerClientTest {
)
);
- boolean isReset = innerInlongManagerClient.resetGroup(new
InlongGroupResetRequest());
+ boolean isReset = groupClient.resetGroup(new
InlongGroupResetRequest());
Assertions.assertTrue(isReset);
}
@@ -672,7 +686,7 @@ class InnerInlongManagerClientTest {
ClusterRequest request = new PulsarClusterRequest();
request.setName("pulsar");
request.setClusterTags("test_cluster");
- Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
+ Integer clusterIndex = clusterClient.saveCluster(request);
Assertions.assertEquals(1, (int) clusterIndex);
}
@@ -710,7 +724,7 @@ class InnerInlongManagerClientTest {
))
);
- StreamSink sinkInfo = innerInlongManagerClient.getSinkInfo(1);
+ StreamSink sinkInfo = sinkClient.getSinkInfo(1);
Assertions.assertEquals(1, sinkInfo.getId());
Assertions.assertTrue(sinkInfo instanceof MySQLSink);
}