This is an automated email from the ASF dual-hosted git repository.

zhangxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new addf510  Support canal plugin (#2035)
addf510 is described below

commit addf5101e006f73467b2c43c3e477c4c8af96cb6
Author: XiaoFu <248628...@qq.com>
AuthorDate: Thu Dec 20 21:07:31 2018 +0800

    Support canal plugin (#2035)
    
    * fix add Canal plugin
    
    * fix add UnitTest
    
    * fix   format pom.xml
    
    * Delete CanalApplication.java
    
    * fix support cluster url
    
    * fix add Licensed
    
    * fix bug
    
    * fix
---
 .../network/trace/component/ComponentsDefine.java  |   5 +-
 .../apm-sdk-plugin/canal-1.x-plugin/pom.xml        |  45 ++++++++
 .../plugin/canal/CanalConstructorInterceptor.java  |  44 ++++++++
 .../apm/plugin/canal/CanalEnhanceInfo.java         |  51 +++++++++
 .../apm/plugin/canal/CanalInterceptor.java         |  85 +++++++++++++++
 .../canal/ClusterNodeConstructInterceptor.java     |  61 +++++++++++
 .../plugin/canal/define/CanalInstrumentation.java  |  81 +++++++++++++++
 .../canal/define/ClusterNodeInstrumentation.java   |  63 +++++++++++
 .../src/main/resources/skywalking-plugin.def       |  18 ++++
 .../apm/plugin/canal/CanalInterceptorTest.java     | 115 +++++++++++++++++++++
 apm-sniffer/apm-sdk-plugin/pom.xml                 |   3 +-
 docker/config/component-libraries.yml              |   3 +
 .../src/main/resources/component-libraries.yml     |   4 +
 13 files changed, 576 insertions(+), 2 deletions(-)

diff --git 
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
 
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
index 73da6a8..fd155d1 100644
--- 
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
+++ 
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -104,6 +104,8 @@ public class ComponentsDefine {
 
     public static final OfficialComponent RABBITMQ_CONSUMER = new 
OfficialComponent(53,"rabbitmq-consumer");
 
+    public static final OfficialComponent CANAL = new 
OfficialComponent(54,"Canal");
+
     private static ComponentsDefine INSTANCE = new ComponentsDefine();
 
     private String[] components;
@@ -113,7 +115,7 @@ public class ComponentsDefine {
     }
 
     public ComponentsDefine() {
-        components = new String[54];
+        components = new String[55];
         addComponent(TOMCAT);
         addComponent(HTTPCLIENT);
         addComponent(DUBBO);
@@ -152,6 +154,7 @@ public class ComponentsDefine {
         addComponent(UNDERTOW);
         addComponent(RABBITMQ_PRODUCER);
         addComponent(RABBITMQ_CONSUMER);
+        addComponent(CANAL);
     }
 
     private void addComponent(OfficialComponent component) {
diff --git a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/pom.xml 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/pom.xml
new file mode 100644
index 0000000..3391c0a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>apm-sdk-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>6.0.0-beta-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>apm-canal-1.x-plugin</artifactId>
+    <name>canal-1.x-plugin</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <canal-client.version>1.1.2</canal-client.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>${canal-client.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalConstructorInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalConstructorInterceptor.java
new file mode 100644
index 0000000..b803ca0
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalConstructorInterceptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.canal;
+
+import java.net.InetSocketAddress;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+
+/**
+ * @author withlin
+ */
+public class CanalConstructorInterceptor implements 
InstanceConstructorInterceptor {
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        InetSocketAddress address = (InetSocketAddress) allArguments[0];
+        String destination =  allArguments[3].toString();
+        CanalEnhanceInfo canalEnhanceInfo = new CanalEnhanceInfo();
+        if (address != null) {
+            String url = address.getAddress().toString() + ":" + 
address.getPort();
+            canalEnhanceInfo.setUrl(url.replace('/',' '));
+        }
+        canalEnhanceInfo.setDestination(destination);
+        objInst.setSkyWalkingDynamicField(canalEnhanceInfo);
+
+    }
+}
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalEnhanceInfo.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalEnhanceInfo.java
new file mode 100644
index 0000000..642ba38
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalEnhanceInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.canal;
+
+/**
+ * @author withlin
+ */
+public class CanalEnhanceInfo {
+
+    public String getUrl() {
+        return url;
+    }
+
+    public CanalEnhanceInfo setUrl(String url) {
+        this.url = url;
+        return this;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public CanalEnhanceInfo setDestination(String destination) {
+        this.destination = destination;
+        return this;
+    }
+   /**
+    * canal-server address
+    */
+    private String url;
+   /**
+    * canal destination
+    */
+    private String destination;
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptor.java
new file mode 100644
index 0000000..3785826
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.canal;
+
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * @author withlin
+ */
+public class CanalInterceptor implements InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments,
+        Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        CanalEnhanceInfo canalEnhanceInfo = 
(CanalEnhanceInfo)objInst.getSkyWalkingDynamicField();
+        SimpleCanalConnector connector = (SimpleCanalConnector) objInst;
+
+        String  url = canalEnhanceInfo.getUrl();
+        if (url == "" || url == null) {
+            InetSocketAddress address = 
(InetSocketAddress)connector.getNextAddress();
+            String runningAddress = address.getAddress().toString() + ":" + 
address.getPort();
+            runningAddress = runningAddress.replace('/',' ');
+            url = runningAddress;
+            List<InetSocketAddress> socketAddressList = 
(List<InetSocketAddress>)ContextManager.getRuntimeContext().get("currentAddress");
+            if (socketAddressList != null && socketAddressList.size() > 0) {
+                for (InetSocketAddress socketAddress : socketAddressList) {
+                    String currentAddress = 
socketAddress.getAddress().toString() + ":" + socketAddress.getPort();
+                    currentAddress = currentAddress.replace('/',' ');
+                    if (!currentAddress.equals(runningAddress)) {
+                        url = url + "," + currentAddress;
+                    }
+                }
+            }
+        }
+        String batchSize = allArguments[0].toString();
+        String destination = canalEnhanceInfo.getDestination();
+        AbstractSpan activeSpan = ContextManager.createExitSpan("Canal/" + 
destination,url).start(System.currentTimeMillis());
+        activeSpan.setComponent(ComponentsDefine.CANAL);
+        activeSpan.tag("batchSize",batchSize);
+        activeSpan.tag("destination",destination);
+
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments,
+        Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method,
+        Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/ClusterNodeConstructInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/ClusterNodeConstructInterceptor.java
new file mode 100644
index 0000000..5d58492
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/ClusterNodeConstructInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+ 
+package org.apache.skywalking.apm.plugin.canal;
+
+import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
+import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author withlin
+ */
+public class ClusterNodeConstructInterceptor implements 
InstanceConstructorInterceptor {
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+
+        String clusterPath = 
ZookeeperPathUtils.getDestinationClusterRoot(allArguments[0].toString());
+        ZkClientx zkClientx =  ((ClusterNodeAccessStrategy) 
objInst).getZkClient();
+        
ContextManager.getRuntimeContext().put("currentAddress",getCurrentAddress(zkClientx.getChildren(clusterPath)));
+
+    }
+
+    private List<InetSocketAddress> getCurrentAddress(List<String> 
currentChilds) {
+        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for (String address : currentChilds) {
+            String[] strs = StringUtils.split(address, ":");
+            if (strs != null && strs.length == 2) {
+                addresses.add(new InetSocketAddress(strs[0], 
Integer.valueOf(strs[1])));
+            }
+        }
+
+        return  addresses;
+
+    }
+}
+
+
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/CanalInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/CanalInstrumentation.java
new file mode 100644
index 0000000..2c8308c
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/CanalInstrumentation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.canal.define;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
+
+
+/**
+ * @author withlin
+ */
+public class CanalInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    public static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.canal.CanalInterceptor";
+    public static final String ENHANCE_CLASS = 
"com.alibaba.otter.canal.client.impl.SimpleCanalConnector";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.canal.CanalConstructorInterceptor";
+    public static final String ENHANCE_METHOD_DISPATCH = "getWithoutAck";
+
+    @Override
+    protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getConstructorMatcher() {
+                    return takesArgument(4, int.class);
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                    return named(ENHANCE_METHOD_DISPATCH);
+                }
+
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS);
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/ClusterNodeInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/ClusterNodeInstrumentation.java
new file mode 100644
index 0000000..21d05d2
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/canal/define/ClusterNodeInstrumentation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+ 
+package org.apache.skywalking.apm.plugin.canal.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.any;
+
+
+/**
+ * @author withlin
+ */
+public class ClusterNodeInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    public static final String ENHANCE_CLASS = 
"com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.canal.ClusterNodeConstructInterceptor";
+
+    @Override
+    protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> 
getConstructorMatcher() {
+                    return any();
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected InstanceMethodsInterceptPoint[] 
getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[0];
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS);
+    }
+}
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000..f01d808
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+canal-1.x=org.apache.skywalking.apm.plugin.canal.define.CanalInstrumentation
+canal-1.x=org.apache.skywalking.apm.plugin.canal.define.ClusterNodeInstrumentation
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptorTest.java
new file mode 100644
index 0000000..c9c6fc3
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/canal-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/canal/CanalInterceptorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.canal;
+
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.common.utils.AddressUtils;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class CanalInterceptorTest {
+
+    private CanalInterceptor canalInterceptor;
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private Object[] arguments;
+
+    private Class[] argumentType;
+
+
+    private  class  CanalConnector extends SimpleCanalConnector implements 
EnhancedInstance {
+
+        public CanalConnector(SocketAddress address, String username, String 
password, String destination) {
+            super(address, username, password, destination);
+        }
+
+        @Override
+        public Object getSkyWalkingDynamicField() {
+            return null;
+        }
+
+        @Override
+        public void setSkyWalkingDynamicField(Object value) {
+
+        }
+    }
+
+
+    private EnhancedInstance enhancedInstance = new CanalConnector(new 
InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "") {
+        @Override
+        public Object getSkyWalkingDynamicField() {
+            CanalEnhanceInfo canalEnhanceInfo = new CanalEnhanceInfo();
+            canalEnhanceInfo.setUrl("localhost:11111");
+            canalEnhanceInfo.setDestination("example");
+            return canalEnhanceInfo;
+        }
+
+        @Override
+        public void setSkyWalkingDynamicField(Object value) {
+        }
+
+
+    };
+
+    @Before
+    public void setUp() {
+        canalInterceptor = new CanalInterceptor();
+        arguments = new Object[] {100};
+    }
+
+    @Test
+    public void testSendMessage() throws Throwable {
+        canalInterceptor.beforeMethod(enhancedInstance, null, arguments, null, 
null);
+        canalInterceptor.afterMethod(enhancedInstance, null, arguments, null, 
null);
+
+        List<TraceSegment> traceSegmentList = 
segmentStorage.getTraceSegments();
+        assertThat(traceSegmentList.size(), is(1));
+
+        TraceSegment segment = traceSegmentList.get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
+        assertThat(spans.size(), is(1));
+    }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml 
b/apm-sniffer/apm-sdk-plugin/pom.xml
index 49d687c..e3f07bd 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -60,7 +60,8 @@
         <module>activemq-5.x-plugin</module>
         <module>elasticsearch-5.x-plugin</module>
         <module>undertow-plugins</module>
-       <module>rabbitmq-5.x-plugin</module>
+        <module>rabbitmq-5.x-plugin</module>
+        <module>canal-1.x-plugin</module>
     </modules>
     <packaging>pom</packaging>
 
diff --git a/docker/config/component-libraries.yml 
b/docker/config/component-libraries.yml
index caa6bfb..0c1c670 100644
--- a/docker/config/component-libraries.yml
+++ b/docker/config/component-libraries.yml
@@ -186,6 +186,9 @@ rabbitmq-producer:
 rabbitmq-consumer:
   id: 53
   languages: Java
+Canal:
+  id: 54
+  languages: Java
 
 # .NET/.NET Core components
 # [3000, 4000) for C#/.NET only
diff --git 
a/oap-server/server-starter/src/main/resources/component-libraries.yml 
b/oap-server/server-starter/src/main/resources/component-libraries.yml
index caa6bfb..ba03e96 100644
--- a/oap-server/server-starter/src/main/resources/component-libraries.yml
+++ b/oap-server/server-starter/src/main/resources/component-libraries.yml
@@ -186,6 +186,10 @@ rabbitmq-producer:
 rabbitmq-consumer:
   id: 53
   languages: Java
+Canal:
+  id: 54
+  languages: Java
+
 
 # .NET/.NET Core components
 # [3000, 4000) for C#/.NET only

Reply via email to