This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new ca31ab0275 [type:feat] add Logging-Kafka Plugin e2e and make
independent of Logging-rocketmq e2e (#5709)
ca31ab0275 is described below
commit ca31ab0275b799a56d704b145960911f32b75d09
Author: jakiuncle <[email protected]>
AuthorDate: Thu Mar 6 09:27:54 2025 +0800
[type:feat] add Logging-Kafka Plugin e2e and make independent of
Logging-rocketmq e2e (#5709)
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:fix] fix zookeeper healthy check
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:fix] fix http rocketmq
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* kafka e2e
* kafka e2e
* kafka e2e
* kafka e2e
* spilt logging rocketmq and logging kafka
* checkstyle
* e2e port
* e2e kafka logging
* e2e kafka logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* e2e rocketmq logging
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* [type:feat]add kafka logging e2e test
* add nonull check
* modify kafka namesrvAddr to bootstrapServer
* create kafka topic
* create kafka topic
* create kafka topic
* create kafka topic
* kafka connection
* kafka
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
* kafka connection
---------
Co-authored-by: aias00 <[email protected]>
Co-authored-by: xiaoyu <[email protected]>
---
.github/workflows/e2e-k8s.yml | 4 +
db/init/mysql/schema.sql | 2 +-
.../apache/shenyu/admin/ShenyuAdminBootstrap.java | 4 +-
.../ShenyuClientRegisterDivideServiceImpl.java | 3 +
shenyu-e2e/pom.xml | 1 +
shenyu-e2e/shenyu-e2e-case/pom.xml | 2 +
.../compose/script/e2e-http-sync-compose.sh | 4 -
.../k8s/script/e2e-http-sync.sh | 3 +
.../shenyu-e2e-case/shenyu-e2e-case-http/pom.xml | 8 --
.../e2e/testcase/http/DividePluginCases.java | 86 +----------
.../shenyu/e2e/testcase/http/DividePluginTest.java | 15 +-
.../compose/script/e2e-logging-kafka-compose.sh} | 28 ++--
.../compose/shenyu-examples-http-compose.yml | 41 ++++++
.../compose/shenyu-kafka-compose.yml | 74 ++++++++++
.../k8s/script/e2e-http-sync.sh | 13 +-
.../k8s/shenyu-kafka.yml | 101 +++++++++++++
.../pom.xml | 8 +-
.../testcase/logging/kafka/DividePluginCases.java | 157 +++++++++++++++++++++
.../testcase/logging/kafka}/DividePluginTest.java | 40 ++++--
.../script/e2e-logging-rocketmq-compose.sh} | 6 +-
.../compose/shenyu-examples-http-compose.yml | 41 ++++++
.../compose/shenyu-rocketmq-compose.yml | 51 +++++++
.../k8s/script/e2e-http-sync.sh | 13 +-
.../k8s/shenyu-kafka.yml | 101 +++++++++++++
.../pom.xml | 4 +-
.../logging/rocketmq}/DividePluginCases.java | 2 +-
.../logging/rocketmq}/DividePluginTest.java | 23 +--
.../org/apache/shenyu/e2e/client/WaitDataSync.java | 2 +-
.../shenyu/e2e/client/gateway/GatewayClient.java | 4 +
.../kafka/client/KafkaLogCollectClient.java | 11 +-
30 files changed, 674 insertions(+), 178 deletions(-)
diff --git a/.github/workflows/e2e-k8s.yml b/.github/workflows/e2e-k8s.yml
index a9577415aa..62739cb4b3 100644
--- a/.github/workflows/e2e-k8s.yml
+++ b/.github/workflows/e2e-k8s.yml
@@ -218,6 +218,10 @@ jobs:
script: e2e-cluster-jdbc-compose
- case: shenyu-e2e-case-cluster
script: e2e-cluster-zookeeper-compose
+ - case: shenyu-e2e-case-logging-rocketmq
+ script: e2e-logging-rocketmq-compose
+ - case: shenyu-e2e-case-logging-kafka
+ script: e2e-logging-kafka-compose
steps:
- uses: actions/checkout@v2
diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql
index 4059214b5d..fafe5e102e 100644
--- a/db/init/mysql/schema.sql
+++ b/db/init/mysql/schema.sql
@@ -923,7 +923,7 @@ INSERT INTO `plugin` VALUES ('6', 'dubbo',
'{\"register\":\"zookeeper://localhos
INSERT INTO `plugin` VALUES ('8', 'springCloud', NULL, 'Proxy', 200, 0,
'2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0,
'2022-05-25 18:02:53', '2022-05-25 18:02:53',null);
INSERT INTO `plugin` VALUES ('32',
'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}',
'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00',null);
-INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\",
\"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02
22:00:00',null);
+INSERT INTO `plugin` VALUES ('33',
'loggingKafka','{\"topic\":\"shenyu-access-logging\",\"namesrvAddr\":\"http://localhost:9092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}',
'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00',null);
INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\":
\"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\":
\"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30
21:00:00',null);
INSERT INTO `plugin` VALUES ('35', 'loggingPulsar',
'{\"topic":\"shenyu-access-logging\", \"serviceUrl\":
\"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00',
'2022-06-30 21:00:00',null);
INSERT INTO `plugin` VALUES ('36', 'loggingTencentCls','{\"endpoint\":
\"ap-guangzhou.cls.tencentcs.com\", \"topic\": \"shenyu-topic\"}', 'Logging',
176, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00',null);
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
index 9bd5026202..796164adcf 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
@@ -22,7 +22,7 @@ import
org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
/**
- * shenyu admin start.
+ * shenyu admin startShenyuAdminBootstrap.
*/
@SpringBootApplication(exclude = {LdapAutoConfiguration.class})
public class ShenyuAdminBootstrap {
@@ -30,7 +30,7 @@ public class ShenyuAdminBootstrap {
/**
* Main entrance.
*
- * @param args startup arguments
+ * @param args startup arguments.
*/
public static void main(final String[] args) {
SpringApplication.run(ShenyuAdminBootstrap.class, args);
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
index 0ec8dcc1d0..33216d68ff 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
@@ -120,6 +120,9 @@ public class ShenyuClientRegisterDivideServiceImpl extends
AbstractContextPathRe
.collect(Collectors.toList());
final List<DivideUpstream> needToRemove =
buildDivideUpstreamList(validUriList);
List<DivideUpstream> existList =
GsonUtils.getInstance().fromCurrentList(selectorDO.getHandle(),
DivideUpstream.class);
+ if (CollectionUtils.isEmpty(existList)) {
+ return Constants.SUCCESS;
+ }
existList.removeAll(needToRemove);
final String handler = GsonUtils.getInstance().toJson(existList);
selectorDO.setHandle(handler);
diff --git a/shenyu-e2e/pom.xml b/shenyu-e2e/pom.xml
index 98fcecf3a0..e524324212 100644
--- a/shenyu-e2e/pom.xml
+++ b/shenyu-e2e/pom.xml
@@ -58,6 +58,7 @@
<guava.version>32.0.0-jre</guava.version>
<commons-collection.verion>4.4</commons-collection.verion>
<websocket.version>1.5.1</websocket.version>
+ <kafka-clients.version>3.7.1</kafka-clients.version>
</properties>
<modules>
diff --git a/shenyu-e2e/shenyu-e2e-case/pom.xml
b/shenyu-e2e/shenyu-e2e-case/pom.xml
index 901a5c5936..4ca3cc3d31 100644
--- a/shenyu-e2e/shenyu-e2e-case/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/pom.xml
@@ -32,6 +32,8 @@
<module>shenyu-e2e-case-cluster</module>
<module>shenyu-e2e-case-storage</module>
<module>shenyu-e2e-case-http</module>
+ <module>shenyu-e2e-case-logging-kafka</module>
+ <module>shenyu-e2e-case-logging-rocketmq</module>
<module>shenyu-e2e-case-spring-cloud</module>
<module>shenyu-e2e-case-apache-dubbo</module>
<module>shenyu-e2e-case-sofa</module>
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
index a0b102413c..446dae7dfb 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
@@ -37,7 +37,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
sleep 30s
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31095/actuator/health
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31195/actuator/health
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d
--quiet-pull
sleep 30s
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31189/actuator/health
@@ -55,9 +54,6 @@ for sync in "${SYNC_ARRAY[@]}"; do
echo "shenyu-bootstrap log:"
echo "------------------"
docker compose -f
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs
shenyu-bootstrap
- echo "shenyu-rocketmq log:"
- echo "------------------"
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
echo "shenyu-examples-http log:"
echo "------------------"
docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs
shenyu-examples-http
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
index 25f37f01d0..95d7931c0b 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
@@ -37,6 +37,8 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31877/actuator/health
+
sleep 30s
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
# shellcheck disable=SC2199
@@ -75,6 +77,7 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl delete -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+
# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
index 2a3141908e..f326c86009 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
@@ -25,12 +25,4 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>shenyu-e2e-case-http</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.3</version>
- </dependency>
- </dependencies>
</project>
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
index d42bfc3aab..b6e3f2aab0 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
@@ -18,48 +18,22 @@
package org.apache.shenyu.e2e.testcase.http;
import com.google.common.collect.Lists;
-import io.restassured.http.Method;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
import
org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
-import org.apache.shenyu.e2e.model.MatchMode;
-import org.apache.shenyu.e2e.model.Plugin;
-import org.apache.shenyu.e2e.model.data.Condition;
-import org.junit.jupiter.api.Assertions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
-import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
-import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
-import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
public class DividePluginCases implements ShenYuScenarioProvider {
- private static final String NAMESERVER = "http://localhost:31876";
-
- private static final String CONSUMERGROUP =
"shenyu-plugin-logging-rocketmq";
-
- private static final String TOPIC = "shenyu-access-logging";
-
- private static final String TEST = "/http/order/findById?id=123";
-
- private static final Logger LOG =
LoggerFactory.getLogger(DividePluginCases.class);
-
@Override
public List<ScenarioSpec> get() {
return Lists.newArrayList(
- testDivideHello(),
- testRocketMQHello()
+ testDivideHello()
);
}
@@ -74,62 +48,4 @@ public class DividePluginCases implements
ShenYuScenarioProvider {
.build())
.build();
}
-
- private ShenYuScenarioSpec testRocketMQHello() {
- return ShenYuScenarioSpec.builder()
- .name("testRocketMQHello")
- .beforeEachSpec(
- ShenYuBeforeEachSpec.builder()
- .addSelectorAndRule(
- newSelectorBuilder("selector",
Plugin.LOGGING_ROCKETMQ)
- .name("1")
- .matchMode(MatchMode.OR)
-
.conditionList(newConditions(Condition.ParamType.URI,
Condition.Operator.STARTS_WITH, "/http"))
- .build(),
- newRuleBuilder("rule")
- .name("1")
- .matchMode(MatchMode.OR)
-
.conditionList(newConditions(Condition.ParamType.URI,
Condition.Operator.STARTS_WITH, "/http"))
- .build()
- )
- .checker(exists(TEST))
- .build()
- )
- .caseSpec(
- ShenYuCaseSpec.builder()
- .add(request -> {
- AtomicBoolean isLog = new
AtomicBoolean(false);
- try {
- Thread.sleep(1000 * 30);
- request.request(Method.GET,
"/http/order/findById?id=23");
- DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer(CONSUMERGROUP);
- consumer.setNamesrvAddr(NAMESERVER);
- consumer.subscribe(TOPIC, "*");
-
consumer.registerMessageListener((MessageListenerConcurrently) (msgs,
consumeConcurrentlyContext) -> {
- LOG.info("Msg:{}", msgs);
- if
(CollectionUtils.isNotEmpty(msgs)) {
- msgs.forEach(e -> {
- if (new
String(e.getBody()).contains("/http/order/findById?id=23")) {
- isLog.set(true);
- }
- });
- }
- return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- LOG.info("consumer.start ;
isLog.get():{}", isLog.get());
- consumer.start();
- Thread.sleep(1000 * 30);
- LOG.info("isLog.get():{}",
isLog.get());
- Assertions.assertTrue(isLog.get());
- } catch (Exception e) {
- LOG.error("error", e);
- Assertions.assertTrue(isLog.get());
- }
- })
- .build()
- )
-// .afterEachSpec(ShenYuAfterEachSpec.builder()
-// .deleteWaiting(notExists(TEST)).build())
- .build();
- }
}
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
index 1b18091e7c..b66fb619b3 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import org.apache.shenyu.e2e.client.WaitDataSync;
import org.apache.shenyu.e2e.client.admin.AdminClient;
import org.apache.shenyu.e2e.client.gateway.GatewayClient;
-import org.apache.shenyu.e2e.constant.Constants;
import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
@@ -35,9 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import static
org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID;
@@ -98,23 +95,13 @@ public class DividePluginTest {
// selectorIds = Lists.newArrayList();
// }
+
@BeforeAll
void setup(final AdminClient adminClient, final GatewayClient
gatewayClient) throws Exception {
adminClient.login();
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors,
gatewayClient::getSelectorCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData,
gatewayClient::getMetaDataCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules,
gatewayClient::getRuleCache, adminClient);
- LOG.info("start loggingRocketMQ plugin");
- Map<String, String> reqBody = new HashMap<>();
- reqBody.put("pluginId", "29");
- reqBody.put("name", "loggingRocketMQ");
- reqBody.put("enabled", "true");
- reqBody.put("role", "Logging");
- reqBody.put("sort", "170");
- reqBody.put("namespaceId",
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
- reqBody.put("config", "{\"topic\":\"shenyu-access-logging\",
\"namesrvAddr\":
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
- adminClient.changePluginStatus("1801816010882822166", reqBody);
- WaitDataSync.waitGatewayPluginUse(gatewayClient,
"org.apache.shenyu.plugin.logging.rocketmq");
}
@ShenYuScenario(provider = DividePluginCases.class)
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
similarity index 74%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
index a0b102413c..3936dd821a 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/script/e2e-logging-kafka-compose.sh
@@ -20,6 +20,9 @@
SHENYU_TESTCASE_DIR=$(dirname "$(dirname "$(dirname "$(dirname "$0")")")")
bash "${SHENYU_TESTCASE_DIR}"/k8s/script/storage/storage_init_mysql.sh
+# init ip
+export HOST_IP=$(hostname -I | awk '{print $1}')
+
# init register center
CUR_PATH=$(readlink -f "$(dirname "$0")")
PRGDIR=$(dirname "$CUR_PATH")
@@ -35,16 +38,21 @@ for sync in "${SYNC_ARRAY[@]}"; do
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml "
docker compose -f
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml up -d --quiet-pull
sleep 30s
- sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31095/actuator/health
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31195/actuator/health
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml up -d --quiet-pull
- docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d
--quiet-pull
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml up -d --quiet-pull
sleep 30s
+ # 创建kafka topic
+ echo "create kafka topic shenyu-access-logging"
+ docker exec shenyu-kafka kafka-topics --create --topic shenyu-access-logging
--partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
+
+# docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml up -d
--quiet-pull
+# sleep 30s
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31095/actuator/health
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31189/actuator/health
sleep 10s
docker ps -a
## run e2e-test
- ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http
-am test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
# shellcheck disable=SC2181
if (($?)); then
echo "${sync}-sync-e2e-test failed"
@@ -55,15 +63,15 @@ for sync in "${SYNC_ARRAY[@]}"; do
echo "shenyu-bootstrap log:"
echo "------------------"
docker compose -f
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml logs
shenyu-bootstrap
- echo "shenyu-rocketmq log:"
- echo "------------------"
- docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
- echo "shenyu-examples-http log:"
+ echo "shenyu-kafka log:"
echo "------------------"
- docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs
shenyu-examples-http
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml logs
+# echo "kafka-console-consumer log:"
+ timeout 50s docker exec shenyu-kafka kafka-console-consumer --topic
shenyu-access-logging --bootstrap-server localhost:9092 --from-beginning
exit 1
fi
docker compose -f
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
- docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
+ docker compose -f "${PRGDIR}"/shenyu-kafka-compose.yml down
+# docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml "
done
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 0000000000..ce8c8a7e7b
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ shenyu-examples-http:
+ image: shenyu-examples-http:latest
+ container_name: shenyu-examples-http
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ ports:
+ - "31189:8189"
+ healthcheck:
+ test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health
| grep UP || exit 1" ]
+ interval: 10s
+ timeout: 2s
+ retries: 3
+ start_period: 10s
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
new file mode 100644
index 0000000000..8487aa1bae
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/compose/shenyu-kafka-compose.yml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ shenyu-zk:
+ container_name: shenyu-zk
+ image: zookeeper:latest
+# network_mode: "host"
+ ports:
+ - "2181:2181"
+ restart: always
+ environment:
+ - ALLOW_ANONYMOUS_LOGIN=yes
+ - ZOO_PORT=2181
+ networks:
+ - shenyu
+
+ shenyu-kafka:
+ image: confluentinc/cp-kafka:latest
+ container_name: shenyu-kafka
+ extra_hosts:
+ - "shenyu-kafka:127.0.0.1"
+ depends_on:
+ - shenyu-zk
+ ports:
+ - "9092:9092"
+ - "29092:29092"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: shenyu-zk:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://shenyu-kafka:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ networks:
+ - shenyu
+
+ shenyu-examples-http:
+ image: shenyu-examples-http:latest
+ container_name: shenyu-examples-http
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ ports:
+ - "31189:8189"
+ healthcheck:
+ test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health
| grep UP || exit 1" ]
+ interval: 10s
+ timeout: 2s
+ retries: 3
+ start_period: 10s
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
\ No newline at end of file
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
similarity index 91%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
index 25f37f01d0..2ca65f63dd 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/script/e2e-http-sync.sh
@@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
for sync in ${SYNC_ARRAY[@]}; do
echo -e "------------------\n"
- kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml
-
- kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+ kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:9092/actuator/health
sleep 30s
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
@@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31095/actuator/health
kubectl apply -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31195/actuator/health
- kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml
- sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31189/actuator/health
sleep 10s
kubectl get pod -o wide
kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print
$1}')"
## run e2e-test
- ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http
-am test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
# shellcheck disable=SC2181
if (($?)); then
echo "${sync}-sync-e2e-test failed"
@@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
kubectl delete -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
kubectl delete -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
- kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
- kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+ kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
new file mode 100644
index 0000000000..9c5c05fad9
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: zookeeper
+ template:
+ metadata:
+ labels:
+ app: zookeeper
+ spec:
+ containers:
+ - name: zookeeper
+ image: zookeeper:3.7
+ ports:
+ - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ ports:
+ - port: 2181
+ name: client
+ selector:
+ app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: kafka
+ template:
+ metadata:
+ labels:
+ app: kafka
+ spec:
+ containers:
+ - name: kafka
+ image: bitnami/kafka:3.6.2
+ env:
+ - name: KAFKA_ADVERTISED_LISTENERS
+ value: PLAINTEXT://kafka:9092
+ - name: KAFKA_ZOOKEEPER_CONNECT
+ value: zookeeper:2181
+ ports:
+ - containerPort: 9092
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ type: NodePort
+ ports:
+ - port: 9092
+ name: client
+ protocol: TCP
+ targetPort: 9092
+ nodePort: 9092
+ selector:
+ app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
similarity index 86%
copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
index 2a3141908e..90ca689ce6 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/pom.xml
@@ -24,13 +24,13 @@
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-e2e-case-http</artifactId>
+ <artifactId>shenyu-e2e-case-logging-kafka</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.3</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
</dependency>
</dependencies>
</project>
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
new file mode 100644
index 0000000000..010c54867f
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginCases.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.e2e.testcase.logging.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import io.restassured.http.Method;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.shenyu.e2e.engine.scenario.ShenYuScenarioProvider;
+import org.apache.shenyu.e2e.engine.scenario.specification.ScenarioSpec;
+import
org.apache.shenyu.e2e.engine.scenario.specification.ShenYuBeforeEachSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuCaseSpec;
+import org.apache.shenyu.e2e.engine.scenario.specification.ShenYuScenarioSpec;
+import org.apache.shenyu.e2e.model.MatchMode;
+import org.apache.shenyu.e2e.model.Plugin;
+import org.apache.shenyu.e2e.model.data.Condition;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static
org.apache.shenyu.e2e.engine.scenario.function.HttpCheckers.exists;
+import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newConditions;
+import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newRuleBuilder;
+import static
org.apache.shenyu.e2e.template.ResourceDataTemplate.newSelectorBuilder;
+
+public class DividePluginCases implements ShenYuScenarioProvider {
+
+ private static final String TOPIC = "shenyu-access-logging";
+
+ private static final String TEST = "/http/order/findById?id=123";
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DividePluginCases.class);
+
+ @Override
+ public List<ScenarioSpec> get() {
+ return Lists.newArrayList(
+ testDivideHello(),
+ testKafkaHello()
+ );
+ }
+
+ private ShenYuScenarioSpec testDivideHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("http client hello1")
+ .beforeEachSpec(ShenYuBeforeEachSpec.builder()
+ .checker(exists(TEST))
+ .build())
+ .caseSpec(ShenYuCaseSpec.builder()
+ .addExists(TEST)
+ .build())
+ .build();
+ }
+
+ private ShenYuScenarioSpec testKafkaHello() {
+ return ShenYuScenarioSpec.builder()
+ .name("testKafkaHello")
+ .beforeEachSpec(
+ ShenYuBeforeEachSpec.builder()
+ .addSelectorAndRule(
+ newSelectorBuilder("selector",
Plugin.LOGGING_KAFKA)
+ .name("2")
+ .matchMode(MatchMode.OR)
+
.conditionList(newConditions(Condition.ParamType.URI,
Condition.Operator.STARTS_WITH, "/http"))
+ .build(),
+ newRuleBuilder("rule")
+ .name("2")
+ .matchMode(MatchMode.OR)
+
.conditionList(newConditions(Condition.ParamType.URI,
Condition.Operator.STARTS_WITH, "/http"))
+ .build()
+ )
+ .checker(exists(TEST))
+ .build()
+ )
+ .caseSpec(
+ ShenYuCaseSpec.builder()
+ .add(request -> {
+ AtomicBoolean messageFound = new
AtomicBoolean(false);
+ try {
+ // Send request first
+ request.request(Method.GET,
"/http/order/findById?id=23");
+
+ Properties properties = new
Properties();
+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "shenyu-consumer-group");
+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
+
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
+
+ try (KafkaConsumer<String, String>
consumer = new KafkaConsumer<>(properties)) {
+
consumer.subscribe(Arrays.asList(TOPIC));
+
+ Instant start = Instant.now();
+ // Set timeout to 30 seconds
+ while (Duration.between(start,
Instant.now()).getSeconds() < 90) {
+ ConsumerRecords<String,
String> records = consumer.poll(Duration.ofMillis(1000));
+ LOG.info("records.count:{}",
records.count());
+
+ for (var record : records) {
+ String message =
record.value();
+ LOG.info("kafka
message:{}", message);
+ if
(message.contains("/http/order/findById")) {
+ messageFound.set(true);
+ consumer.commitSync();
+ break;
+ }
+ }
+
+ if (messageFound.get()) {
+ break;
+ }
+ }
+
+ if (!messageFound.get()) {
+ LOG.error("Timeout waiting for
kafka message");
+ Assertions.fail("Did not
receive expected message within timeout period");
+ }
+
+
Assertions.assertTrue(messageFound.get(), "Expected message was not found in
Kafka topic");
+ }
+ } catch (Exception e) {
+ LOG.error("Error during kafka message
consumption", e);
+ throw new RuntimeException("Failed to
consume kafka message", e);
+ }
+ }).build()
+ ).build();
+ }
+}
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
similarity index 80%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
index 1b18091e7c..ea32a50e4c 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-kafka/src/test/java/org/apache/shenyu/e2e/testcase/logging/kafka/DividePluginTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.kafka;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import org.apache.shenyu.e2e.client.WaitDataSync;
import org.apache.shenyu.e2e.client.admin.AdminClient;
@@ -24,12 +25,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient;
import org.apache.shenyu.e2e.constant.Constants;
import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
import org.apache.shenyu.e2e.model.ResourcesData;
import org.apache.shenyu.e2e.model.data.BindingData;
import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
@@ -65,7 +68,9 @@ import static
org.apache.shenyu.e2e.constant.Constants.SYS_DEFAULT_NAMESPACE_NAM
public class DividePluginTest {
private static final Logger LOG =
LoggerFactory.getLogger(DividePluginTest.class);
-
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
private List<String> selectorIds = Lists.newArrayList();
@BeforeEach
@@ -91,30 +96,35 @@ public class DividePluginTest {
spec.getWaiting().waitFor(gateway);
}
-// @AfterEach
-// void after(final AdminClient client, final GatewayClient gateway, final
AfterEachSpec spec) {
-// spec.getDeleter().delete(client, selectorIds);
-// spec.deleteWaiting().waitFor(gateway);
-// selectorIds = Lists.newArrayList();
-// }
+ @AfterEach
+ void after(final AdminClient client, final GatewayClient gateway, final
AfterEachSpec spec) {
+ spec.getDeleter().delete(client, selectorIds);
+ spec.deleteWaiting().waitFor(gateway);
+ selectorIds = Lists.newArrayList();
+ }
@BeforeAll
void setup(final AdminClient adminClient, final GatewayClient
gatewayClient) throws Exception {
adminClient.login();
+
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors,
gatewayClient::getSelectorCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData,
gatewayClient::getMetaDataCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules,
gatewayClient::getRuleCache, adminClient);
- LOG.info("start loggingRocketMQ plugin");
+
Map<String, String> reqBody = new HashMap<>();
- reqBody.put("pluginId", "29");
- reqBody.put("name", "loggingRocketMQ");
+ LOG.info("start loggingKafka plugin");
+ reqBody.put("pluginId", "33");
+ reqBody.put("name", "loggingKafka");
reqBody.put("enabled", "true");
reqBody.put("role", "Logging");
- reqBody.put("sort", "170");
+ reqBody.put("sort", "180");
reqBody.put("namespaceId",
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
- reqBody.put("config", "{\"topic\":\"shenyu-access-logging\",
\"namesrvAddr\":
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
- adminClient.changePluginStatus("1801816010882822166", reqBody);
- WaitDataSync.waitGatewayPluginUse(gatewayClient,
"org.apache.shenyu.plugin.logging.rocketmq");
+ reqBody.put("config",
+
"{\"topic\":\"shenyu-access-logging\",\"bootstrapServer\":\"shenyu-kafka:29092\",\"sampleRate\":\"1\",\"maxResponseBody\":524288,\"maxRequestBody\":524288,\"compressAlg\":\"none\"}");
+ adminClient.changePluginStatus("1801816010882822171", reqBody);
+// TimeUnit.SECONDS.sleep(5);
+// Map<String, Integer> plugins = gatewayClient.getPlugins();
+ WaitDataSync.waitGatewayPluginUse(gatewayClient,
"org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin");
}
@ShenYuScenario(provider = DividePluginCases.class)
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
similarity index 94%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
index a0b102413c..f62d9b33d1 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/compose/script/e2e-http-sync-compose.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/script/e2e-logging-rocketmq-compose.sh
@@ -44,7 +44,7 @@ for sync in "${SYNC_ARRAY[@]}"; do
sleep 10s
docker ps -a
## run e2e-test
- ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http
-am test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl
shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq -am test
# shellcheck disable=SC2181
if (($?)); then
echo "${sync}-sync-e2e-test failed"
@@ -58,12 +58,10 @@ for sync in "${SYNC_ARRAY[@]}"; do
echo "shenyu-rocketmq log:"
echo "------------------"
docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml logs
- echo "shenyu-examples-http log:"
- echo "------------------"
- docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml logs
shenyu-examples-http
exit 1
fi
docker compose -f
"$SHENYU_TESTCASE_DIR"/compose/sync/shenyu-sync-"${sync}".yml down
+ docker compose -f "${PRGDIR}"/shenyu-rocketmq-compose.yml down
docker compose -f "${PRGDIR}"/shenyu-examples-http-compose.yml down
echo "[Remove ${sync} synchronous] delete shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml "
done
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
new file mode 100644
index 0000000000..ce8c8a7e7b
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-examples-http-compose.yml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ shenyu-examples-http:
+ image: shenyu-examples-http:latest
+ container_name: shenyu-examples-http
+ environment:
+ - shenyu.register.serverLists=http://shenyu-admin:9095
+ ports:
+ - "31189:8189"
+ healthcheck:
+ test: [ "CMD-SHELL", "wget -q -O - http://localhost:8189/actuator/health
| grep UP || exit 1" ]
+ interval: 10s
+ timeout: 2s
+ retries: 3
+ start_period: 10s
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
new file mode 100644
index 0000000000..5f34a739ca
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/compose/shenyu-rocketmq-compose.yml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.9'
+
+services:
+ rocketmq-dialevoneid:
+ image: rocketmqinc/rocketmq:4.4.0
+ container_name: rocketmq-dialevoneid
+ command: [ "/bin/sh", "mqnamesrv" ]
+ ports:
+ - "31876:9876"
+ environment:
+ - TZ=Asia/Shanghai
+ restart: always
+ networks:
+ - shenyu
+
+ rocketmq-broker:
+ image: rocketmqinc/rocketmq:4.4.0
+ container_name: rocketmq-broker
+ command: [ "/bin/sh", "mqbroker" ]
+ ports:
+ - "10909:10909"
+ - "10911:10911"
+ - "10912:10912"
+ environment:
+ - NAMESRV_ADDR=rocketmq-dialevoneid:9876
+ - TZ=Asia/Shanghai
+ restart: always
+ networks:
+ - shenyu
+
+networks:
+ shenyu:
+ name: shenyu
+ driver: bridge
+ external: true
\ No newline at end of file
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
similarity index 91%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
index 25f37f01d0..2ca65f63dd 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/k8s/script/e2e-http-sync.sh
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/script/e2e-http-sync.sh
@@ -33,9 +33,8 @@ SYNC_ARRAY=("websocket" "http" "zookeeper" "etcd")
MIDDLEWARE_SYNC_ARRAY=("zookeeper" "etcd" "nacos")
for sync in ${SYNC_ARRAY[@]}; do
echo -e "------------------\n"
- kubectl apply -f "$SHENYU_TESTCASE_DIR"/k8s/shenyu-mysql.yml
-
- kubectl apply -f "${PRGDIR}"/shenyu-rocketmq.yml
+ kubectl apply -f "${PRGDIR}"/shenyu-kafka.yml
+ sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:9092/actuator/health
sleep 30s
echo "[Start ${sync} synchronous] create shenyu-admin-${sync}.yml
shenyu-bootstrap-${sync}.yml shenyu-examples-springcloud.yml"
@@ -50,15 +49,13 @@ for sync in ${SYNC_ARRAY[@]}; do
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31095/actuator/health
kubectl apply -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31195/actuator/health
- kubectl apply -f "${PRGDIR}"/shenyu-examples-http.yml
- sh "$SHENYU_TESTCASE_DIR"/k8s/script/healthcheck.sh
http://localhost:31189/actuator/health
sleep 10s
kubectl get pod -o wide
kubectl logs "$(kubectl get pod -o wide | grep shenyu-admin | awk '{print
$1}')"
## run e2e-test
- ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-case/shenyu-e2e-case-http
-am test
+ ./mvnw -B -f ./shenyu-e2e/pom.xml -pl
shenyu-e2e-case/shenyu-e2e-case-logging-kafka -am test
# shellcheck disable=SC2181
if (($?)); then
echo "${sync}-sync-e2e-test failed"
@@ -73,8 +70,8 @@ for sync in ${SYNC_ARRAY[@]}; do
kubectl delete -f "${SHENYU_TESTCASE_DIR}"/k8s/shenyu-mysql.yml
kubectl delete -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-admin-"${sync}".yml
kubectl delete -f
"${SHENYU_TESTCASE_DIR}"/k8s/sync/shenyu-bootstrap-"${sync}".yml
- kubectl delete -f "${PRGDIR}"/shenyu-examples-http.yml
- kubectl delete -f "${PRGDIR}"/shenyu-rocketmq.yml
+ kubectl delete -f "${PRGDIR}"/shenyu-kafka.yml
+
# shellcheck disable=SC2199
# shellcheck disable=SC2076
if [[ "${MIDDLEWARE_SYNC_ARRAY[@]}" =~ "${sync}" ]]; then
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
new file mode 100644
index 0000000000..94a63c58ae
--- /dev/null
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/k8s/shenyu-kafka.yml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: zookeeper
+ template:
+ metadata:
+ labels:
+ app: zookeeper
+ spec:
+ containers:
+ - name: zookeeper
+ image: zookeeper:3.7
+ ports:
+ - containerPort: 2181
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: zookeeper
+ namespace: default
+ labels:
+ app: zookeeper
+spec:
+ ports:
+ - port: 2181
+ name: client
+ selector:
+ app: zookeeper
+
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: kafka
+ template:
+ metadata:
+ labels:
+ app: kafka
+ spec:
+ containers:
+ - name: kafka
+ image: bitnami/kafka:3.6.2
+ env:
+ - name: KAFKA_ADVERTISED_LISTENERS
+ value: PLAINTEXT://kafka:9092
+ - name: KAFKA_ZOOKEEPER_CONNECT
+ value: zookeeper:2181
+ ports:
+ - containerPort: 31877
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: kafka
+ namespace: default
+ labels:
+ app: kafka
+spec:
+ type: NodePort
+ ports:
+ - port: 31877
+ name: client
+ protocol: TCP
+ targetPort: 31877
+ nodePort: 31877
+ selector:
+ app: kafka
\ No newline at end of file
diff --git a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
similarity index 95%
copy from shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
copy to shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
index 2a3141908e..f9e8eef963 100644
--- a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/pom.xml
+++ b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/pom.xml
@@ -24,8 +24,8 @@
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-e2e-case-http</artifactId>
-
+ <artifactId>shenyu-e2e-case-logging-rocketmq</artifactId>
+
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
similarity index 99%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
index d42bfc3aab..bc24271d34 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginCases.java
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginCases.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
import com.google.common.collect.Lists;
import io.restassured.http.Method;
diff --git
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
similarity index 89%
copy from
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
copy to
shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
index 1b18091e7c..66f6ee5bbe 100644
---
a/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-http/src/test/java/org/apache/shenyu/e2e/testcase/http/DividePluginTest.java
+++
b/shenyu-e2e/shenyu-e2e-case/shenyu-e2e-case-logging-rocketmq/src/test/java/org/apache/shenyu/e2e/testcase/logging/rocketmq/DividePluginTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shenyu.e2e.testcase.http;
+package org.apache.shenyu.e2e.testcase.logging.rocketmq;
import com.google.common.collect.Lists;
import org.apache.shenyu.e2e.client.WaitDataSync;
@@ -24,12 +24,14 @@ import org.apache.shenyu.e2e.client.gateway.GatewayClient;
import org.apache.shenyu.e2e.constant.Constants;
import org.apache.shenyu.e2e.engine.annotation.ShenYuScenario;
import org.apache.shenyu.e2e.engine.annotation.ShenYuTest;
+import org.apache.shenyu.e2e.engine.scenario.specification.AfterEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.BeforeEachSpec;
import org.apache.shenyu.e2e.engine.scenario.specification.CaseSpec;
import org.apache.shenyu.e2e.enums.ServiceTypeEnum;
import org.apache.shenyu.e2e.model.ResourcesData;
import org.apache.shenyu.e2e.model.data.BindingData;
import org.apache.shenyu.e2e.model.response.SelectorDTO;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
@@ -91,19 +93,21 @@ public class DividePluginTest {
spec.getWaiting().waitFor(gateway);
}
-// @AfterEach
-// void after(final AdminClient client, final GatewayClient gateway, final
AfterEachSpec spec) {
-// spec.getDeleter().delete(client, selectorIds);
-// spec.deleteWaiting().waitFor(gateway);
-// selectorIds = Lists.newArrayList();
-// }
+ @AfterEach
+ void after(final AdminClient client, final GatewayClient gateway, final
AfterEachSpec spec) {
+ spec.getDeleter().delete(client, selectorIds);
+ spec.deleteWaiting().waitFor(gateway);
+ selectorIds = Lists.newArrayList();
+ }
@BeforeAll
void setup(final AdminClient adminClient, final GatewayClient
gatewayClient) throws Exception {
adminClient.login();
+
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllSelectors,
gatewayClient::getSelectorCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllMetaData,
gatewayClient::getMetaDataCache, adminClient);
WaitDataSync.waitAdmin2GatewayDataSyncEquals(adminClient::listAllRules,
gatewayClient::getRuleCache, adminClient);
+
LOG.info("start loggingRocketMQ plugin");
Map<String, String> reqBody = new HashMap<>();
reqBody.put("pluginId", "29");
@@ -114,7 +118,10 @@ public class DividePluginTest {
reqBody.put("namespaceId",
Constants.SYS_DEFAULT_NAMESPACE_NAMESPACE_ID);
reqBody.put("config", "{\"topic\":\"shenyu-access-logging\",
\"namesrvAddr\":
\"rocketmq-dialevoneid:9876\",\"producerGroup\":\"shenyu-plugin-logging-rocketmq\"}");
adminClient.changePluginStatus("1801816010882822166", reqBody);
- WaitDataSync.waitGatewayPluginUse(gatewayClient,
"org.apache.shenyu.plugin.logging.rocketmq");
+ Map<String, Integer> plugins = gatewayClient.getPlugins();
+ LOG.info("shenyu e2e plugin list ={}", plugins);
+ WaitDataSync.waitGatewayPluginUse(gatewayClient,
"org.apache.shenyu.plugin.logging.rocketmq.LoggingRocketMQPlugin");
+
}
@ShenYuScenario(provider = DividePluginCases.class)
diff --git
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
index ce3efdd91c..518eb461e4 100644
---
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
+++
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/WaitDataSync.java
@@ -83,7 +83,7 @@ public class WaitDataSync {
Map<String, Integer> pluginMap = gatewayClient.getPlugins();
int retryNum = 0;
boolean existPlugin = false;
- while (!existPlugin && retryNum < 5) {
+ while (!existPlugin && retryNum < 10) {
for (String plugin : pluginMap.keySet()) {
if (plugin.startsWith(pluginClass)) {
existPlugin = true;
diff --git
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
index 5ffd1a61eb..50afe58dd7 100644
---
a/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
+++
b/shenyu-e2e/shenyu-e2e-client/src/main/java/org/apache/shenyu/e2e/client/gateway/GatewayClient.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.response.Response;
import io.restassured.specification.RequestSpecification;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.e2e.annotation.ShenYuGatewayClient;
import org.apache.shenyu.e2e.client.BaseClient;
import org.apache.shenyu.e2e.common.RequestLogConsumer;
@@ -178,6 +179,9 @@ public class GatewayClient extends BaseClient {
List<SelectorCacheData> selectorDataList = new ArrayList<>();
for (Map.Entry entry : s.entrySet()) {
List list = (List) entry.getValue();
+ if (CollectionUtils.isEmpty(list)) {
+ continue;
+ }
String json = MAPPER.writeValueAsString(list.get(0));
SelectorCacheData selectorData = MAPPER.readValue(json,
SelectorCacheData.class);
selectorDataList.add(selectorData);
diff --git
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
index 6d12c447ff..42c3bf33a4 100644
---
a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
+++
b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/client/KafkaLogCollectClient.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
@@ -65,12 +66,13 @@ public class KafkaLogCollectClient extends
AbstractLogConsumeClient<KafkaLogColl
*/
@Override
public void initClient0(@NonNull final
KafkaLogCollectConfig.KafkaLogConfig config) {
- if (Objects.isNull(config)
- || StringUtils.isBlank(config.getBootstrapServer())
- || StringUtils.isBlank(config.getTopic())) {
+ if (StringUtils.isBlank(config.getBootstrapServer()) ||
StringUtils.isBlank(config.getTopic())) {
LOG.error("kafka props is empty. failed init kafka producer");
return;
}
+
+ LOG.info("initClient0:{}", GsonUtils.getInstance().toJson(config));
+
String topic = config.getTopic();
String nameserverAddress = config.getBootstrapServer();
@@ -122,11 +124,14 @@ public class KafkaLogCollectClient extends
AbstractLogConsumeClient<KafkaLogColl
.map(apiConfig ->
StringUtils.defaultIfBlank(apiConfig.getTopic(), topic)
).orElse(topic);
try {
+ LOG.info("logTopic:{}, log:{}", logTopic, log);
producer.send(toProducerRecord(logTopic, log), (metadata,
exception) -> {
+ LOG.info("kafka push logs metadata:{}",
GsonUtils.getInstance().toJson(metadata));
if (Objects.nonNull(exception)) {
LOG.error("kafka push logs error", exception);
}
});
+ producer.flush();
} catch (Exception e) {
LOG.error("kafka push logs error", e);
}