This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.19
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 0ed8114dcb05eefecb8f6c1234449f6f1a362d47
Author: Vasily Makarov <[email protected]>
AuthorDate: Fri Jun 26 10:44:29 2020 +0300

    NIFI-7190 CaptureChangeMySQL processor remove comments from normalized query
    
    This closes #6711
    
    Co-authored-by: Vasily Makarov <[email protected]>
    Co-authored-by: Matt Burgess <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../cdc/mysql/processors/CaptureChangeMySQL.java   |  16 +-
 .../mysql/processors/CaptureChangeMySQLTest.groovy |  35 ++
 .../nifi/cdc/mysql/CaptureChangeMySQLTest.java     | 393 ---------------------
 .../nifi/cdc/mysql/MockBinlogClientJava.java       | 108 ------
 4 files changed, 50 insertions(+), 502 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 3e245c7ab0..08dad6dc40 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -157,6 +157,9 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
     // Random invalid constant used as an indicator to not set the binlog 
position on the client (thereby using the latest available)
     private static final int DO_NOT_SET = -1000;
 
+    // A regular expression matching multiline comments, used when parsing DDL 
statements
+    private static final Pattern MULTI_COMMENT_PATTERN = 
Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -968,7 +971,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                         currentTable = null;
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize 
the query to do string matching on the type of change
-                        String normalizedQuery = 
sql.toLowerCase().trim().replaceAll(" {2,}", " ");
+                        String normalizedQuery = normalizeQuery(sql);
+
                         if (normalizedQuery.startsWith("alter table")
                                 || normalizedQuery.startsWith("alter ignore 
table")
                                 || normalizedQuery.startsWith("create table")
@@ -1111,6 +1115,16 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         currentSession.clearState(Scope.CLUSTER);
     }
 
+    protected String normalizeQuery(String sql) {
+        String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", 
" ");
+
+        //Remove comments from the query
+        normalizedQuery = 
MULTI_COMMENT_PATTERN.matcher(normalizedQuery).replaceAll("").trim();
+        normalizedQuery = normalizedQuery.replaceAll("#.*", "");
+        normalizedQuery = normalizedQuery.replaceAll("-{2}.*", "");
+        return normalizedQuery;
+    }
+
     protected void stop() throws CDCException {
         try {
             if (binlogClient != null) {
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 37231e533e..cc03e2aaa6 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -1185,6 +1185,40 @@ class CaptureChangeMySQLTest {
         )
     }
 
+    @Test
+    void testGetXIDEvents() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION)
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306")
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root")
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds")
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true")
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+        Map<String, String> clientProperties = new HashMap<>()
+        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
"localhost")
+        testRunner.addControllerService("client", cacheClient, 
clientProperties)
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client")
+        testRunner.enableControllerService(cacheClient)
+
+        testRunner.run(1, false, true)
+        // COMMIT
+        EventHeaderV4 header2 = new EventHeaderV4()
+        header2.setEventType(EventType.XID)
+        header2.setNextPosition(12)
+        header2.setTimestamp(new Date().getTime())
+        EventData eventData = new EventData() {
+        };
+        client.sendEvent(new Event(header2, eventData));
+
+        // when we ge a xid event without having got a 'begin' event ,throw an 
exception
+        assertThrows(AssertionError.class, () -> testRunner.run(1, false, 
false))
+    }
+
+    @Test
+    void testNormalizeQuery() throws Exception {
+        assertEquals("alter table", processor.normalizeQuery(" alter table"))
+        assertEquals("alter table", processor.normalizeQuery(" /* This is a \n 
multiline comment test */ alter table"))
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/
@@ -1224,6 +1258,7 @@ class CaptureChangeMySQLTest {
             
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
             return mockConnection
         }
+
     }
 
 
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
deleted file mode 100644
index 0d3fd0f257..0000000000
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql;
-
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.Event;
-import com.github.shyiko.mysql.binlog.event.EventData;
-import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
-import com.github.shyiko.mysql.binlog.event.EventType;
-import com.github.shyiko.mysql.binlog.event.QueryEventData;
-import com.github.shyiko.mysql.binlog.event.RotateEventData;
-import com.github.shyiko.mysql.binlog.network.SSLMode;
-import org.apache.commons.io.output.WriterOutputStream;
-import org.apache.nifi.cdc.event.ColumnDefinition;
-import org.apache.nifi.cdc.event.TableInfo;
-import org.apache.nifi.cdc.event.TableInfoCacheKey;
-import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.util.MockComponentLog;
-import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class CaptureChangeMySQLTest {
-
-    private static final String DRIVER_LOCATION = 
"http://mysql-driver.com/driver.jar";;
-    CaptureChangeMySQL processor;
-    TestRunner testRunner;
-    MockBinlogClientJava client = new MockBinlogClientJava("localhost", 3306, 
"root", "password");
-
-    @BeforeEach
-    void setUp() throws Exception {
-        processor = new MockCaptureChangeMySQL();
-        testRunner = TestRunners.newTestRunner(processor);
-    }
-
-    @Test
-    void testConnectionFailures() throws Exception {
-        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION);
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
-        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
-        testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, "1");
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
-        Map<String, String> clientProperties = new HashMap<>();
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
"localhost");
-        testRunner.addControllerService("client", cacheClient, 
clientProperties);
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
-        testRunner.enableControllerService(cacheClient);
-        client.connectionError = true;
-        try {
-            testRunner.run();
-        } catch (AssertionError ae) {
-            Throwable pe = ae.getCause();
-            assertTrue(pe instanceof ProcessException);
-            Throwable ioe = pe.getCause();
-            assertTrue(ioe instanceof IOException);
-            assertEquals("Could not connect binlog client to any of the 
specified hosts due to: Error during connect", ioe.getMessage());
-            assertTrue(ioe.getCause() instanceof IOException);
-        }
-        client.connectionError = false;
-
-        client.connectionTimeout = true;
-        try {
-            testRunner.run();
-        } catch (AssertionError ae) {
-            Throwable pe = ae.getCause();
-            assertTrue(pe instanceof ProcessException);
-            Throwable ioe = pe.getCause();
-            assertTrue(ioe instanceof IOException);
-            assertEquals("Could not connect binlog client to any of the 
specified hosts due to: Connection timed out", ioe.getMessage());
-            assertTrue(ioe.getCause() instanceof TimeoutException);
-        }
-        client.connectionTimeout = false;
-    }
-
-    @Test
-    void testSslModeDisabledSslContextServiceNotRequired() {
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
-        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, 
SSLMode.DISABLED.toString());
-        testRunner.assertValid();
-    }
-
-    @Test
-    void testGetXIDEvents() throws Exception {
-        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION);
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
-        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
-        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 
seconds");
-        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 
"true");
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
-        Map<String, String> clientProperties = new HashMap<>();
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
"localhost");
-        testRunner.addControllerService("client", cacheClient, 
clientProperties);
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
-        testRunner.enableControllerService(cacheClient);
-
-        testRunner.run(1, false, true);
-        // COMMIT
-        EventHeaderV4 header2 = new EventHeaderV4();
-        header2.setEventType(EventType.XID);
-        header2.setNextPosition(12);
-        header2.setTimestamp(new Date().getTime());
-        EventData eventData = new EventData() {
-        };
-        client.sendEvent(new Event(header2, eventData));
-
-        // when we ge a xid event without having got a 'begin' event ,throw an 
exception
-        assertThrows(AssertionError.class, () -> testRunner.run(1, false, 
false));
-    }
-
-    @Test
-    void testBeginCommitTransaction() throws Exception {
-        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION);
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306");
-        testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root");
-        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 
seconds");
-        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 
"true");
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient();
-        Map<String, String> clientProperties = new HashMap<>();
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
"localhost");
-        testRunner.addControllerService("client", cacheClient, 
clientProperties);
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client");
-        testRunner.enableControllerService(cacheClient);
-
-
-        testRunner.run(1, false, true);
-
-        EventHeaderV4 header = new EventHeaderV4();
-        header.setEventType(EventType.ROTATE);
-        header.setNextPosition(2);
-        header.setTimestamp(new Date().getTime());
-        RotateEventData rotateEventData = new RotateEventData();
-        rotateEventData.setBinlogFilename("mysql-bin.000001");
-        rotateEventData.setBinlogPosition(4L);
-        client.sendEvent(new Event(header, rotateEventData));
-
-        // BEGIN
-        EventHeaderV4 header1 = new EventHeaderV4();
-        header1.setEventType(EventType.QUERY);
-        header1.setNextPosition(6);
-        header1.setTimestamp(new Date().getTime());
-        QueryEventData rotateEventData1 = new QueryEventData();
-        rotateEventData1.setDatabase("mysql-bin.000001");
-        rotateEventData1.setDatabase("myDB");
-        rotateEventData1.setSql("BEGIN");
-        client.sendEvent(new Event(header1, rotateEventData1));
-
-        // COMMIT
-        EventHeaderV4 header2 = new EventHeaderV4();
-        header2.setEventType(EventType.XID);
-        header2.setNextPosition(12);
-        header2.setTimestamp(new Date().getTime());
-        EventData eventData2 = new EventData() {
-        };
-        client.sendEvent(new Event(header2, eventData2));
-
-        //when get a xid event,stop and restart the processor
-        //here we used to get an exception
-        testRunner.run(1, true, false);
-        testRunner.run(1, false, false);
-
-        // next transaction
-        // BEGIN
-        EventHeaderV4 header3 = new EventHeaderV4();
-        header3.setEventType(EventType.QUERY);
-        header3.setNextPosition(16);
-        header3.setTimestamp(new Date().getTime());
-        QueryEventData rotateEventData3 = new QueryEventData();
-        rotateEventData3.setDatabase("mysql-bin.000001");
-        rotateEventData3.setDatabase("myDB");
-        rotateEventData3.setSql("BEGIN");
-        client.sendEvent(new Event(header3, rotateEventData3));
-
-
-        testRunner.run(1, true, false);
-    }
-
-    /********************************
-     * Mock and helper classes below
-     ********************************/
-
-    static DistributedMapCacheClientImpl createCacheClient() throws 
InitializationException {
-
-        final DistributedMapCacheClientImpl client = new 
DistributedMapCacheClientImpl();
-        final ComponentLog logger = new MockComponentLog("client", client);
-        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager(client));
-
-        client.initialize(clientInitContext);
-
-        return client;
-    }
-
-    static final class DistributedMapCacheClientImpl extends 
AbstractControllerService implements DistributedMapCacheClient {
-
-        private final Map<String, String> cacheMap = new HashMap<>();
-
-
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            List<PropertyDescriptor> descriptors = new ArrayList<>();
-            descriptors.add(DistributedMapCacheClientService.HOSTNAME);
-            
descriptors.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
-            descriptors.add(DistributedMapCacheClientService.PORT);
-            
descriptors.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
-            return descriptors;
-        }
-
-        @Override
-        public <K, V> boolean putIfAbsent(
-                final K key,
-                final V value,
-                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
-
-            StringWriter keyWriter = new StringWriter();
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
-            String keyString = keyWriter.toString();
-
-            if (cacheMap.containsKey(keyString)) return false;
-
-            StringWriter valueWriter = new StringWriter();
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter));
-            return true;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public <K, V> V getAndPutIfAbsent(
-                final K key, final V value, final Serializer<K> keySerializer, 
final Serializer<V> valueSerializer,
-                final Deserializer<V> valueDeserializer) throws IOException {
-            StringWriter keyWriter = new StringWriter();
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
-            String keyString = keyWriter.toString();
-
-            if (cacheMap.containsKey(keyString))
-                return 
valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8));
-
-            StringWriter valueWriter = new StringWriter();
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter));
-            return null;
-        }
-
-        @Override
-        public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-            StringWriter keyWriter = new StringWriter();
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
-            String keyString = keyWriter.toString();
-
-            return cacheMap.containsKey(keyString);
-        }
-
-        @Override
-        public <K, V> V get(
-                final K key,
-                final Serializer<K> keySerializer, final Deserializer<V> 
valueDeserializer) throws IOException {
-            StringWriter keyWriter = new StringWriter();
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
-            String keyString = keyWriter.toString();
-
-            return (cacheMap.containsKey(keyString)) ? 
valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8))
 : null;
-        }
-
-        @Override
-        public void close() throws IOException {
-
-        }
-
-        @Override
-        public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
-            StringWriter keyWriter = new StringWriter();
-            serializer.serialize(key, new WriterOutputStream(keyWriter));
-            String keyString = keyWriter.toString();
-
-            boolean removed = (cacheMap.containsKey(keyString));
-            cacheMap.remove(keyString);
-            return removed;
-        }
-
-        @Override
-        public long removeByPattern(String regex) throws IOException {
-            final List<String> removedRecords = new ArrayList<>();
-            Pattern p = Pattern.compile(regex);
-            for (String key : cacheMap.keySet()) {
-                // Key must be backed by something that can be converted into 
a String
-                Matcher m = p.matcher(key);
-                if (m.matches()) {
-                    removedRecords.add(cacheMap.get(key));
-                }
-            }
-            final long numRemoved = removedRecords.size();
-            for (String it : removedRecords) {
-                cacheMap.remove(it);
-            }
-            return numRemoved;
-        }
-
-        @Override
-        public <K, V> void put(
-                final K key,
-                final V value,
-                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
-            StringWriter keyWriter = new StringWriter();
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter));
-            StringWriter valueWriter = new StringWriter();
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter));
-        }
-    }
-
-    public class MockCaptureChangeMySQL extends CaptureChangeMySQL {
-
-        Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>();
-
-        public BinaryLogClient createBinlogClient(String hostname, int port, 
String username, String password) {
-            return client;
-        }
-
-        @Override
-        public TableInfo loadTableInfo(TableInfoCacheKey key) {
-            TableInfo tableInfo = cache.get(key);
-            if (tableInfo == null) {
-                List<ColumnDefinition> column = new ArrayList<>();
-                column.add(new ColumnDefinition((byte) 4, "id"));
-                column.add(new ColumnDefinition((byte) -4, "string1"));
-
-                tableInfo = new TableInfo(key.getDatabaseName(), 
key.getTableName(), key.getTableId(), column);
-                cache.put(key, tableInfo);
-            }
-            return tableInfo;
-        }
-
-        @Override
-        protected void registerDriver(String locationString, String drvName) 
throws InitializationException {
-        }
-
-        @Override
-        protected Connection getJdbcConnection() throws SQLException {
-            Connection mockConnection = mock(Connection.class);
-            Statement mockStatement = mock(Statement.class);
-            when(mockConnection.createStatement()).thenReturn(mockStatement);
-            ResultSet mockResultSet = mock(ResultSet.class);
-            
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet);
-            return mockConnection;
-        }
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
deleted file mode 100644
index d23822292e..0000000000
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql;
-
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.Event;
-import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-public class MockBinlogClientJava extends BinaryLogClient {
-    String hostname;
-    int port;
-    String username;
-    String password;
-
-    boolean connected;
-    public boolean connectionTimeout = false;
-    public boolean connectionError = false;
-
-    List<LifecycleListener> lifecycleListeners = new ArrayList<>();
-    SSLSocketFactory sslSocketFactory;
-
-    List<EventListener> eventListeners = new ArrayList<>();
-
-
-    public MockBinlogClientJava(String hostname, int port, String username, 
String password) {
-        super(hostname, port, username, password);
-        this.hostname = hostname;
-        this.port = port;
-        this.username = username;
-        this.password = password;
-    }
-
-    @Override
-    public void connect(long timeoutInMilliseconds) throws IOException, 
TimeoutException {
-        if (connectionTimeout) {
-            throw new TimeoutException("Connection timed out");
-        }
-        if (connectionError) {
-            throw new IOException("Error during connect");
-        }
-        if (password == null) {
-            throw new NullPointerException("Password can't be null");
-        }
-        connected = true;
-    }
-
-    @Override
-    public void disconnect() throws IOException {
-        connected = false;
-    }
-
-    @Override
-    public boolean isConnected() {
-        return connected;
-    }
-
-    @Override
-    public void registerEventListener(EventListener eventListener) {
-        eventListeners.add(eventListener);
-    }
-
-    public void unregisterEventListener(EventListener eventListener) {
-        eventListeners.remove(eventListener);
-    }
-
-    @Override
-    public void registerLifecycleListener(LifecycleListener lifecycleListener) 
{
-        if (!lifecycleListeners.contains(lifecycleListener)) {
-            lifecycleListeners.add(lifecycleListener);
-        }
-    }
-
-    @Override
-    public void unregisterLifecycleListener(LifecycleListener 
lifecycleListener) {
-        lifecycleListeners.remove(lifecycleListener);
-    }
-
-    @Override
-    public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
-        super.setSslSocketFactory(sslSocketFactory);
-        this.sslSocketFactory = sslSocketFactory;
-    }
-
-    public void sendEvent(Event event) {
-        for (EventListener eventListener : eventListeners) {
-            eventListener.onEvent(event);
-        }
-    }
-}

Reply via email to