This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 395c3f325828bce0497fa5f54a385f071d02484d Author: zh378814 <[email protected]> AuthorDate: Fri Apr 22 14:04:13 2022 +0800 Remove redundant verification, complete document example parameters, and optimize RocketMQ's key and tag acquisition logic. --- .../aliyun/rocketmq-connect-dingtalk/README.md | 6 +- .../connect/dingtalk/sink/DingTalkSinkTask.java | 5 -- connectors/aliyun/rocketmq-connect-fc/README.md | 5 +- .../rocketmq/connect/fc/sink/FcSinkTask.java | 8 --- connectors/aliyun/rocketmq-connect-mns/README.md | 5 +- .../rocketmq/connect/mns/source/MNSSourceTask.java | 8 --- .../aliyun/rocketmq-connect-rocketmq/README.md | 24 ++++---- .../connect/rocketmq/RocketMQSinkConnector.java | 25 ++++++++ .../connect/rocketmq/RocketMQSinkTask.java | 39 ++---------- .../connect/rocketmq/RocketMQSourceConnector.java | 34 +++++++++++ .../connect/rocketmq/RocketMQSourceTask.java | 70 ++++++---------------- .../connect/rocketmq/common/RocketMQConstant.java | 4 ++ .../rocketmq/RocketMQSinkConnectorTest.java | 3 +- .../rocketmq/RocketMQSourceConnectorTest.java | 12 ++++ connectors/rocketmq-connect-http/README.md | 5 +- .../rocketmq/connect/http/sink/HttpSinkTask.java | 8 +-- 16 files changed, 127 insertions(+), 134 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/README.md b/connectors/aliyun/rocketmq-connect-dingtalk/README.md index 127036a..2ab5385 100644 --- a/connectors/aliyun/rocketmq-connect-dingtalk/README.md +++ b/connectors/aliyun/rocketmq-connect-dingtalk/README.md @@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-ding-talk-sink-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector",“webHook”:"${webHook}",“msgtype”:"${msgtype}","secretKey":"${secretKey}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname" : "${connect-topicname}","webHook":"${webHook}","msgtype":"${msgtype}","secretKey":"${secretKey}"} ``` 例子 ``` http://localhost:8081/connectors/dingTalkConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"} +"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname" : "ding-talk-topic","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"} ``` >**注:** `rocketmq-ding-talk-connect` >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 @@ -40,4 +40,4 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/ |webHook | String | YES | 机器人的Webhook地址 | https://oapi.dingtalk.com/robot/send?access_token=XXXXXX | |msgtype | String | NO | 消息类型 | text | | |secretKey | String | NO | 密钥 | SC | - +|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx | diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java index cb4e158..98e52fb 100644 --- a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java @@ -7,7 +7,6 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.errors.ConnectException; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,10 +56,6 @@ public class DingTalkSinkTask extends SinkTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(DingTalkConstant.WEB_HOOK)) || - StringUtils.isBlank(config.getString(DingTalkConstant.SECRET_KEY))) { - throw new RuntimeException("ding talk required parameter is null !"); - } } @Override diff --git a/connectors/aliyun/rocketmq-connect-fc/README.md b/connectors/aliyun/rocketmq-connect-fc/README.md index b22979e..69dc110 100644 --- a/connectors/aliyun/rocketmq-connect-fc/README.md +++ b/connectors/aliyun/rocketmq-connect-fc/README.md @@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname" : "${connect-topicname}",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"} ``` 例子 ``` http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"} +"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname" : "fc-topic",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"} ``` >**注:** `rocketmq-fc-connect` >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 @@ -45,4 +45,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/st |functionName | String | YES | 函数名称 | xxxx | |invocationType | String | NO | 同步或者异步 | null | |qualifier | String | NO | 服务版本和别名 | LATEST | +|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx | diff --git a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java index 8080b82..a56739f 100644 --- a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java @@ -80,14 +80,6 @@ public class FcSinkTask extends SinkTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(FcConstant.REGION_ID_CONSTANT)) - || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_ID_CONSTANT)) - || StringUtils.isBlank(config.getString(FcConstant.ACCESS__KEY_SECRET_CONSTANT)) - || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT)) - || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT)) - || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) { - throw new RuntimeException("fc required parameter is null !"); - } try { GetServiceRequest getServiceRequest = new GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT)); getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT)); diff --git a/connectors/aliyun/rocketmq-connect-mns/README.md b/connectors/aliyun/rocketmq-connect-mns/README.md index 8c2effd..bc143b6 100644 --- a/connectors/aliyun/rocketmq-connect-mns/README.md +++ b/connectors/aliyun/rocketmq-connect-mns/README.md @@ -15,14 +15,14 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-source-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname" : "${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"} ``` 例子 ``` http://localhost:8081/connectors/mnsConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx", +"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname" : "mns-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx", "accountId":"xxxx","batchSize":"8","isBase64Decode":"true"} ``` @@ -46,3 +46,4 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-connector-name}/s | accountId | String | YES | 阿里云yourAccountId | 10000000 | | batchSize | Integer | NO | 批量接受消息数量 | 8 | | isBase64Decode | String | NO | 是否开启Base64解码 | true | +|connect-topicname | String | YES | source需要处理数据消息topic | xxxx | \ No newline at end of file diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java index ce4a588..d54092c 100644 --- a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java +++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java @@ -13,7 +13,6 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; import io.openmessaging.connector.api.component.task.source.SourceTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,13 +85,6 @@ public class MNSSourceTask extends SourceTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID)) - || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET)) - || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT)) - || StringUtils.isBlank(config.getString(QUEUE_NAME)) - || StringUtils.isBlank(config.getString(ACCOUNT_ID))) { - throw new RuntimeException("mns required parameter is null !"); - } // 检测队列名称是否存在 PagingListResult<QueueMeta> queueMetaPagingListResult = mnsClient.listQueue(queueName, null, 1); List<QueueMeta> result = queueMetaPagingListResult.getResult(); diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/README.md b/connectors/aliyun/rocketmq-connect-rocketmq/README.md index cb41006..025a044 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/README.md +++ b/connectors/aliyun/rocketmq-connect-rocketmq/README.md @@ -11,14 +11,14 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-source-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname" : "${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"} ``` 例子 ``` http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", +"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname" : "rocketmq-source-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", "instanceId":"xxxx", "consumerGroup":"xxxx"} ``` @@ -26,13 +26,13 @@ http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketm ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-sink-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname" : "${connect-topicname}", "accessKeyId":"${accessKeyId}", "accessKeySecret":"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"} ``` 例子 ``` http://localhost:8081/connectors/rocketmqConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", +"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname" : "rocketmq-sink-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic", "instanceId":"xxxx"} ``` @@ -47,14 +47,15 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na ## rocketmq-connect-rocketmq 参数说明 * **rocketmq-source-connector 参数说明** -| KEY | TYPE | Must be filled | Description| Example -|------------------------|---------|----------------|------------|---| -| accessKeyId | String | YES | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | -| accessKeySecret | String | YES | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| KEY | TYPE | Must be filled | Description | Example +|------------------------|---------|----------------|--------------------------------------------|---| +| accessKeyId | String | YES | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | +| accessKeySecret | String | YES | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx | | namesrvAddr | String | YES | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx | -| topic | String | YES | 消息主题 | xxxx | -| instanceId | String | NO | 阿里云MQ控制台的实例Id | xxxx | -| consumerGroup | String | YES | 消息订阅者 | xxxx | +| topic | String | YES | 消息主题 | xxxx | +| instanceId | String | NO | 阿里云MQ控制台的实例Id | xxxx | +| consumerGroup | String | YES | 消息订阅者 | xxxx | +|connect-topicname | String | YES | source需要处理数据消息topic | xxxx | ``` 注:1. source/sink配置文件说明是以rocketmq-connect-rocketmq为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector为准 @@ -68,4 +69,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na | namesrvAddr | String | YES | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx | | topic | String | YES | 消息主题 | xxxx | | instanceId | String | NO | 阿里云MQ控制台的实例Id | xxxx | +|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx | diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java index 790c6dd..0d13598 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java @@ -1,17 +1,25 @@ package org.apache.rocketmq.connect.rocketmq; +import com.aliyun.ons20190214.Client; +import com.aliyun.ons20190214.models.OnsTopicListRequest; +import com.aliyun.ons20190214.models.OnsTopicListResponse; import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import com.aliyun.teaopenapi.models.Config; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.sink.SinkConnector; import io.openmessaging.internal.DefaultKeyValue; import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; public class RocketMQSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(RocketMQSinkConnector.class); private String accessKeyId; @@ -59,6 +67,23 @@ public class RocketMQSinkConnector extends SinkConnector { || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) { throw new RuntimeException("rocketmq required parameter is null !"); } + try { + Config onsConfig = new Config() + .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); + onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); + final Client client = new Client(onsConfig); + OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() + .setTopic(config.getString(RocketMQConstant.TOPIC)) + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); + final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); + if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter topic does not exist !"); + } + } catch (Exception e) { + log.error("RocketMQSinkTask | validate | error => ", e); + throw new RuntimeException(e.getMessage()); + } } @Override diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java index 6c8eb36..13f2623 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java @@ -1,22 +1,18 @@ package org.apache.rocketmq.connect.rocketmq; -import com.aliyun.ons20190214.Client; -import com.aliyun.ons20190214.models.OnsTopicListRequest; -import com.aliyun.ons20190214.models.OnsTopicListResponse; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; +import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; -import com.aliyun.teaopenapi.models.Config; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.errors.ConnectException; import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; -import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,14 +41,15 @@ public class RocketMQSinkTask extends SinkTask { sinkRecords.forEach(connectRecord -> { Message message = new Message(); message.setBody(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8)); - // TODO message.setKey(); - // TODO message.setTag(); final KeyValue extensions = connectRecord.getExtensions(); if (extensions != null) { + message.setKey(extensions.getString(RocketMQConstant.KEY)); + message.setTag(extensions.getString(RocketMQConstant.TAG)); extensions.keySet().forEach(key -> message.putUserProperties(key, extensions.getString(key))); } message.setTopic(topic); - producer.send(message); + final SendResult send = producer.send(message); + log.info("RocketMQSinkTask | put | send : {}", send); }); } catch (Exception e) { log.error("RocketMQSinkTask | put | error => ", e); @@ -72,30 +69,6 @@ public class RocketMQSinkTask extends SinkTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) - || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) - || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) - || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) { - throw new RuntimeException("rocketmq required parameter is null !"); - } - // 检查topic是否存在 - try { - Config onsConfig = new Config() - .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) - .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); - onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); - final Client client = new Client(onsConfig); - OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() - .setTopic(config.getString(RocketMQConstant.TOPIC)) - .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); - final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); - if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { - throw new RuntimeException("rocketmq required parameter topic does not exist !"); - } - } catch (Exception e) { - log.error("RocketMQSinkTask | validate | error => ", e); - throw new RuntimeException(e.getMessage()); - } } @Override @@ -118,7 +91,7 @@ public class RocketMQSinkTask extends SinkTask { properties.put(PropertyKeyConst.AccessKey, accessKeyId); properties.put(PropertyKeyConst.SecretKey, accessKeySecret); if (StringUtils.isNotBlank(instanceId)) { - properties.put(PropertyKeyConst.INSTANCE_ID, instanceId); + properties.put(PropertyKeyConst.INSTANCE_ID, instanceId); } properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); producer = ONSFactory.createProducer(properties); diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java index 778aa2b..3197837 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java @@ -1,16 +1,26 @@ package org.apache.rocketmq.connect.rocketmq; +import com.aliyun.ons20190214.Client; +import com.aliyun.ons20190214.models.OnsGroupListRequest; +import com.aliyun.ons20190214.models.OnsGroupListResponse; +import com.aliyun.ons20190214.models.OnsTopicListRequest; +import com.aliyun.ons20190214.models.OnsTopicListResponse; import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; +import com.aliyun.teaopenapi.models.Config; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.source.SourceConnector; import io.openmessaging.internal.DefaultKeyValue; import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; +import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; public class RocketMQSourceConnector extends SourceConnector { + private static final Logger log = LoggerFactory.getLogger(RocketMQSourceConnector.class); private String accessKeyId; @@ -62,6 +72,30 @@ public class RocketMQSourceConnector extends SourceConnector { || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) { throw new RuntimeException("rocketmq required parameter is null !"); } + try { + Config onsConfig = new Config() + .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) + .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); + onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); + final Client client = new Client(onsConfig); + OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() + .setTopic(config.getString(RocketMQConstant.TOPIC)) + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); + final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); + if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter topic does not exist !"); + } + OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest() + .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)) + .setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP)); + final OnsGroupListResponse onsGroupListResponse = client.onsGroupList(onsGroupListRequest); + if (onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) { + throw new RuntimeException("rocketmq required parameter consumerGroup does not exist !"); + } + } catch (Exception e) { + log.error("RocketMQSinkTask | validate | error => ", e); + throw new RuntimeException(e.getMessage()); + } } @Override diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java index 9de3ace..cf8def1 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java @@ -1,17 +1,11 @@ package org.apache.rocketmq.connect.rocketmq; -import com.aliyun.ons20190214.Client; -import com.aliyun.ons20190214.models.OnsGroupListRequest; -import com.aliyun.ons20190214.models.OnsGroupListResponse; -import com.aliyun.ons20190214.models.OnsTopicListRequest; -import com.aliyun.ons20190214.models.OnsTopicListResponse; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.shade.com.google.common.collect.Maps; import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils; -import com.aliyun.teaopenapi.models.Config; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; import io.openmessaging.connector.api.component.task.source.SourceTaskContext; @@ -19,7 +13,6 @@ import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.RecordOffset; import io.openmessaging.connector.api.data.RecordPartition; import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; -import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils; import org.assertj.core.util.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +49,6 @@ public class RocketMQSourceTask extends SourceTask { @Override public List<ConnectRecord> poll() throws InterruptedException { - if (consumer == null) { - initConsumer(); - } List<ConnectRecord> connectRecords = Lists.newArrayList(); blockingQueue.drainTo(connectRecords, BATCH_POLL_SIZE); return connectRecords; @@ -76,38 +66,6 @@ public class RocketMQSourceTask extends SourceTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID)) - || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)) - || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR)) - || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC)) - || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) { - throw new RuntimeException("rocketmq required parameter is null !"); - } - // 检查topic和consumer group是否存在 - try { - Config onsConfig = new Config() - .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID)) - .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET)); - onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR)); - final Client client = new Client(onsConfig); - OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest() - .setTopic(config.getString(RocketMQConstant.TOPIC)) - .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)); - final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest); - if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) { - throw new RuntimeException("rocketmq required parameter topic does not exist !"); - } - OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest() - .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID)) - .setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP)); - final OnsGroupListResponse onsGroupListResponse = client.onsGroupList(onsGroupListRequest); - if (onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) { - throw new RuntimeException("rocketmq required parameter consumerGroup does not exist !"); - } - } catch (Exception e) { - log.error("RocketMQSinkTask | validate | error => ", e); - throw new RuntimeException(e.getMessage()); - } } @Override @@ -122,17 +80,10 @@ public class RocketMQSourceTask extends SourceTask { @Override public void start(SourceTaskContext sourceTaskContext) { - try { - super.start(sourceTaskContext); - initConsumer(); - consumer.start(); - } catch (Exception e) { - log.error("RocketMQSourceTask | start | error => ", e); - throw e; - } + super.start(sourceTaskContext); } - private void initConsumer() { + private void initConsumer(String tag) { try { Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, consumerGroup); @@ -143,9 +94,9 @@ public class RocketMQSourceTask extends SourceTask { properties.put(PropertyKeyConst.INSTANCE_ID, instanceId); } consumer = ONSFactory.createConsumer(properties); - // TODO TAG先忽略 - consumer.subscribe(topic, "*", (message, consumeContext) -> { + consumer.subscribe(topic, tag, (message, consumeContext) -> { try { + log.info("RocketMQSourceTask | commit | initConsumer | message : {}", message); Map<String, String> sourceRecordPartition = Maps.newHashMap(); sourceRecordPartition.put("topic", message.getTopic()); sourceRecordPartition.put("brokerName", message.getBornHost()); @@ -174,6 +125,19 @@ public class RocketMQSourceTask extends SourceTask { } } + @Override + public void commit(List<ConnectRecord> connectRecords) throws InterruptedException { + try { + if (connectRecords.isEmpty()) return; + final ConnectRecord connectRecord = connectRecords.get(0); + initConsumer(connectRecord.getExtension(RocketMQConstant.TAG)); + consumer.start(); + } catch (Exception e) { + log.error("RocketMQSourceTask | commit | error => ", e); + throw e; + } + } + @Override public void stop() { consumer.shutdown(); diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java index cb1f352..529fd0f 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java @@ -14,4 +14,8 @@ public class RocketMQConstant { public static final String CONSUMER_GROUP = "consumerGroup"; + public static final String KEY = "KEYS"; + + public static final String TAG = "TAGS"; + } diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java index 897bb09..c05b19b 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java @@ -75,7 +75,8 @@ public class RocketMQSinkConnectorTest { ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), System.currentTimeMillis()); connectRecord.setData("test message"); connectRecords.add(connectRecord); - connectRecord.addExtension("key", "value"); + connectRecord.addExtension(RocketMQConstant.KEY, "value"); + connectRecord.addExtension(RocketMQConstant.TAG, "tag"); rocketMQSinkTask.put(connectRecords); } diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java index bbdac55..cd0f2e2 100644 --- a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java +++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java @@ -2,12 +2,19 @@ package org.apache.rocketmq.connect.rocketmq; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; import io.openmessaging.connector.api.storage.OffsetStorageReader; import io.openmessaging.internal.DefaultKeyValue; import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + public class RocketMQSourceConnectorTest { @Test @@ -44,6 +51,11 @@ public class RocketMQSourceConnectorTest { return null; } }); + List<ConnectRecord> connectRecords = new ArrayList<>(11); + ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), System.currentTimeMillis()); + connectRecord.addExtension(RocketMQConstant.TAG, "*"); + connectRecords.add(connectRecord); + rocketMQSourceTask.commit(connectRecords); rocketMQSourceTask.poll(); Thread.sleep(50000); } diff --git a/connectors/rocketmq-connect-http/README.md b/connectors/rocketmq-connect-http/README.md index 67b4bb8..8b6d5c7 100644 --- a/connectors/rocketmq-connect-http/README.md +++ b/connectors/rocketmq-connect-http/README.md @@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true ``` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name} -?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector",“url”:"${url}"} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"} ``` 例子 ``` http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", -"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","url":"192.168.1.2"} +"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"} ``` >**注:** `rocketmq-http-connect` >的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 @@ -38,4 +38,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/ | KEY | TYPE | Must be filled | Description | Example |-----|---------|----------------|-------------|------------------| | url | String | YES | sink端 域名地址 | http://127.0.0.1 | +|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx | diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java index 679653e..603bafa 100644 --- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java +++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java @@ -1,13 +1,12 @@ package org.apache.rocketmq.connect.http.sink; -import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils; -import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.errors.ConnectException; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils; +import org.apache.rocketmq.connect.http.sink.constant.HttpConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +42,6 @@ public class HttpSinkTask extends SinkTask { @Override public void validate(KeyValue config) { - if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) { - throw new RuntimeException("http required parameter is null !"); - } } @Override
