[ 
https://issues.apache.org/jira/browse/NIFI-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941190#comment-15941190
 ] 

ASF GitHub Bot commented on NIFI-3413:
--------------------------------------

Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1618#discussion_r107920928
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetChangeDataCaptureMySQL.java
 ---
    @@ -0,0 +1,879 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import com.github.shyiko.mysql.binlog.BinaryLogClient;
    +import com.github.shyiko.mysql.binlog.event.Event;
    +import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
    +import com.github.shyiko.mysql.binlog.event.EventType;
    +import com.github.shyiko.mysql.binlog.event.QueryEventData;
    +import com.github.shyiko.mysql.binlog.event.RotateEventData;
    +import com.github.shyiko.mysql.binlog.event.TableMapEventData;
    +import org.apache.nifi.annotation.behavior.DynamicProperties;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.standard.db.CDCException;
    +import org.apache.nifi.processors.standard.db.event.ColumnDefinition;
    +import org.apache.nifi.processors.standard.db.event.RowEventException;
    +import org.apache.nifi.processors.standard.db.event.TableInfo;
    +import org.apache.nifi.processors.standard.db.event.TableInfoCacheKey;
    +import org.apache.nifi.processors.standard.db.event.io.EventWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.BeginTransactionEventInfo;
    +import org.apache.nifi.processors.standard.db.impl.mysql.RawBinlogEvent;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.BinlogEventListener;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.BinlogEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.CommitTransactionEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.DeleteRowsEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.SchemaChangeEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.UpdateRowsEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.InsertRowsEventInfo;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.BeginTransactionEventWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.CommitTransactionEventWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.DeleteRowsWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.InsertRowsWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.SchemaChangeEventWriter;
    +import 
org.apache.nifi.processors.standard.db.impl.mysql.event.io.UpdateRowsWriter;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.MalformedURLException;
    +import java.sql.Connection;
    +import java.sql.Driver;
    +import java.sql.DriverManager;
    +import java.sql.DriverPropertyInfo;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.sql.SQLFeatureNotSupportedException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.logging.Logger;
    +import java.util.regex.Pattern;
    +
    +/**
    + * A processor to retrieve Change Data Capture (CDC) events and send them 
as flow files.
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "jdbc", "cdc", "mysql"})
    +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a 
MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
    +        + "are output as individual flow files ordered by the time at 
which the operation occurred.")
    +@Stateful(scopes = Scope.LOCAL, description = "Information such as a 
'pointer' to the current CDC event in the database is stored by this processor, 
such "
    +        + "that it can continue from the same location if restarted.")
    +@DynamicProperties({
    +        @DynamicProperty(
    +                name = "init.binlog.filename",
    +                value = "The binlog filename to start from",
    +                description = "Specifies the name of the binlog file from 
which to begin retrieving CDC records."
    +        ),
    +        @DynamicProperty(
    +                name = "init.binlog.position",
    +                value = "The offset into the initial binlog file to start 
from",
    +                description = "Specifies the offset into the initial 
binlog file from which to begin retrieving CDC records."
    +        )
    +})
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "cdc.sequence.id", description = "A 
sequence identifier (i.e. strictly increasing integer value) specifying the 
order "
    +                + "of the CDC event flow file relative to the other event 
flow file(s)."),
    +        @WritesAttribute(attribute = "cdc.event.type", description = "A 
string indicating the type of CDC event that occurred, including (but not 
limited to) "
    +                + "'begin', 'write', 'update', 'delete', 'schema_change' 
and 'commit'."),
    +        @WritesAttribute(attribute = "mime.type", description = "The 
processor outputs flow file content in JSON format, and sets the mime.type 
attribute to "
    +                + "application/json")
    +})
    +public class GetChangeDataCaptureMySQL extends 
AbstractSessionFactoryProcessor {
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query 
result set.")
    +            .build();
    +
    +    protected static Set<Relationship> relationships;
    +
    +    // Properties
    +    public static final PropertyDescriptor DATABASE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-db-name-pattern")
    +            .displayName("Database/Schema Name Pattern")
    +            .description("A regular expression (regex) for matching 
databases or schemas (depending on your RDBMS' terminology) against the list of 
CDC events. The regex must match "
    +                    + "the schema name as it is stored in the database. If 
the property is not set, the schema name will not be used to filter the CDC 
events.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-name-pattern")
    +            .displayName("Table Name Pattern")
    +            .description("A regular expression (regex) for matching CDC 
events affecting matching tables. The regex must match the table name as it is 
stored in the database. "
    +                    + "If the property is not set, no events will be 
filtered based on table name.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECT_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-max-wait-time")
    +            .displayName("Max Wait Time")
    +            .description("The maximum amount of time allowed for a 
connection to be established, "
    +                    + "zero means there is effectively no limit. Max time 
less than 1 second will be equal to zero.")
    +            .defaultValue("0 seconds")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-hosts")
    +            .displayName("MySQL Hosts")
    +            .description("A list of hostname/port entries corresponding to 
nodes in a MySQL cluster. The entries should be comma separated "
    +                    + "using a colon such as host1:port,host2:port,....  
For example mysql.myhost.com:3306. This processor will attempt to connect to "
    +                    + "the hosts in the list in order. If one node goes 
down and failover is enabled for the cluster, then the processor will connect "
    +                    + "to the active node (assuming its host entry is 
specified in this property.  The default port for MySQL connections is 3306.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DRIVER_NAME = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-driver-class")
    +            .displayName("MySQL Driver Class Name")
    +            .description("The class name of the MySQL database driver 
class")
    +            .defaultValue("com.mysql.jdbc.Driver")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DRIVER_LOCATION = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-driver-locations")
    +            .displayName("MySQL Driver Location(s)")
    +            .description("Comma-separated list of files/folders and/or 
URLs containing the MySQL driver JAR and its dependencies (if any). "
    +                    + "For example 
'/var/tmp/mysql-connector-java-5.1.38-bin.jar'")
    +            .defaultValue(null)
    +            .required(false)
    +            .addValidator(StandardValidators.createListValidator(true, 
true, StandardValidators.createURLorFileValidator()))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-username")
    +            .displayName("Username")
    +            .description("Username to access the MySQL cluster")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-password")
    +            .displayName("Password")
    +            .description("Password to access the MySQL cluster")
    +            .required(false)
    +            .sensitive(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DIST_CACHE_CLIENT = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-dist-map-cache-client")
    +            .displayName("Distributed Map Cache Client")
    +            .description("Identifies a Distributed Map Cache Client 
controller service to be used for keeping information about the various tables, 
columns, etc. "
    +                    + "needed by the processor. If a client is not 
specified, the generated events will not include column type or name 
information.")
    +            .identifiesControllerService(DistributedMapCacheClient.class)
    +            .required(false)
    +            .build();
    +
    +    public static final PropertyDescriptor RETRIEVE_ALL_RECORDS = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-retrieve-all-records")
    +            .displayName("Retrieve All Records")
    +            .description("Specifies whether to get all available CDC 
events, regardless of the current binlog filename and/or position. If this 
property is set to true, "
    +                    + "any init.binlog.filename and init.binlog.position 
dynamic properties are ignored. NOTE: If binlog filename and position values 
are present in the processor's "
    +                    + "State Map, this property's value is ignored. To 
reset the behavior, clear the processor state (refer to the State Management 
section of the processor's documentation.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor INIT_SEQUENCE_ID = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-init-seq-id")
    +            .displayName("Initial Sequence ID")
    +            .description("Specifies an initial sequence identifier to use 
if this processor's State does not have a current "
    +                    + "sequence identifier. If a sequence identifier is 
present in the processor's State, this property is ignored. Sequence 
identifiers are "
    +                    + "monotonically increasing integers that record the 
order of flow files generated by the processor. They can be used with the 
EnforceOrder "
    +                    + "processor to guarantee ordered delivery of CDC 
events.")
    +            .required(false)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor INIT_BINLOG_FILENAME = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-init-binlog-filename")
    +            .displayName("Initial Binlog Filename")
    +            .description("Specifies an initial binlog filename to use if 
this processor's State does not have a current binlog filename. If a filename 
is present "
    +                    + "in the processor's State, this property is ignored. 
This can be used along with Initial Binlog Position to \"skip ahead\" if 
previous events are not desired. "
    +                    + "Note that NiFi Expression Language is supported, 
but this property is evaluated when the processor is configured, so FlowFile 
attributes may not be used. Expression "
    +                    + "Language is supported to enable the use of the 
Variable Registry and/or environment properties.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor INIT_BINLOG_POSITION = new 
PropertyDescriptor.Builder()
    +            .name("get-cdc-mysql-init-binlog-position")
    +            .displayName("Initial Binlog Position")
    +            .description("Specifies an initial offset into a binlog 
(specified by Initial Binlog Filename) to use if this processor's State does 
not have a current "
    +                    + "binlog filename. If a filename is present in the 
processor's State, this property is ignored. This can be used along with 
Initial Binlog Filename "
    +                    + "to \"skip ahead\" if previous events are not 
desired. Note that NiFi Expression Language is supported, but this property is 
evaluated when the "
    +                    + "processor is configured, so FlowFile attributes may 
not be used. Expression Language is supported to enable the use of the Variable 
Registry "
    +                    + "and/or environment properties.")
    +            .required(false)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    private static List<PropertyDescriptor> propDescriptors;
    +
    +    private volatile ProcessSession currentSession;
    +    private BinaryLogClient binlogClient;
    +    private volatile LinkedBlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>();
    +    private volatile String currentBinlogFile = null;
    +    private volatile long currentBinlogPosition = 4;
    +
    +    // The following variables save the value of the binlog filename and 
position (and sequence id) at the beginning of a transaction. Used for rollback
    +    private volatile String xactBinlogFile = null;
    +    private volatile long xactBinlogPosition = 4;
    +    private volatile long xactSequenceId = 0;
    +
    +    private volatile TableInfo currentTable = null;
    +    private volatile Pattern databaseNamePattern;
    +    private volatile Pattern tableNamePattern;
    +
    +    private volatile boolean inTransaction = false;
    +    private volatile boolean skipTable = false;
    +    private AtomicBoolean doStop = new AtomicBoolean(false);
    +
    +    private int currentHost = 0;
    +
    +    private AtomicLong currentSequenceId = new AtomicLong(0);
    +
    +    private volatile DistributedMapCacheClient cacheClient = null;
    +    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new 
TableInfoCacheKey.Serializer();
    +    private final Serializer<TableInfo> cacheValueSerializer = new 
TableInfo.Serializer();
    +    private final Deserializer<TableInfo> cacheValueDeserializer = new 
TableInfo.Deserializer();
    +
    +    private Connection jdbcConnection = null;
    +
    +    private static final BeginTransactionEventWriter beginEventWriter = 
new BeginTransactionEventWriter();
    +    private static final CommitTransactionEventWriter commitEventWriter = 
new CommitTransactionEventWriter();
    +    private static final SchemaChangeEventWriter schemaChangeEventWriter = 
new SchemaChangeEventWriter();
    +    private static final InsertRowsWriter insertRowsWriter = new 
InsertRowsWriter();
    +    private static final DeleteRowsWriter deleteRowsWriter = new 
DeleteRowsWriter();
    +    private static final UpdateRowsWriter updateRowsWriter = new 
UpdateRowsWriter();
    +
    +    static {
    +
    +        final Set<Relationship> r = new HashSet<>();
    +        r.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(r);
    +
    +        final List<PropertyDescriptor> pds = new ArrayList<>();
    +        pds.add(HOSTS);
    +        pds.add(DRIVER_NAME);
    +        pds.add(DRIVER_LOCATION);
    +        pds.add(USERNAME);
    +        pds.add(PASSWORD);
    +        pds.add(DATABASE_NAME_PATTERN);
    +        pds.add(TABLE_NAME_PATTERN);
    +        pds.add(CONNECT_TIMEOUT);
    +        pds.add(DIST_CACHE_CLIENT);
    +        pds.add(RETRIEVE_ALL_RECORDS);
    +        pds.add(INIT_SEQUENCE_ID);
    +        propDescriptors = Collections.unmodifiableList(pds);
    +    }
    +
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        if (INIT_BINLOG_FILENAME.getName().equals(propertyDescriptorName)) 
{
    +            return INIT_BINLOG_FILENAME;
    +        }
    +        if (INIT_BINLOG_POSITION.getName().equals(propertyDescriptorName)) 
{
    +            return INIT_BINLOG_POSITION;
    +        }
    +        return null;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +
    +        final ComponentLog logger = getLogger();
    +
    +        final StateManager stateManager = context.getStateManager();
    +        final StateMap stateMap;
    +
    +        try {
    +            stateMap = stateManager.getState(Scope.LOCAL);
    +        } catch (final IOException ioe) {
    +            logger.error("Failed to retrieve observed maximum values from 
the State Manager. Will not attempt "
    +                    + "connection until this is accomplished.", ioe);
    +            context.yield();
    +            return;
    +        }
    +
    +        Map<String, String> statePropertiesMap = new 
HashMap<>(stateMap.toMap());
    +
    +        // Pre-store initial sequence ID if none has been set in the state 
map
    +        final PropertyValue initSequenceId = 
context.getProperty(INIT_SEQUENCE_ID);
    +        statePropertiesMap.computeIfAbsent(EventWriter.SEQUENCE_ID_KEY, k 
-> initSequenceId.isSet() ? 
initSequenceId.evaluateAttributeExpressions().getValue() : "0");
    +
    +
    +        boolean getAllRecords = 
context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
    +
    +        // Set current binlog filename to whatever is in State, falling 
back to the Retrieve All Records then Initial Binlog Filename, if no State 
variable is present
    +        currentBinlogFile = 
stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
    +        if (StringUtils.isEmpty(currentBinlogFile)) {
    +            if (!getAllRecords && 
context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
    +                currentBinlogFile = 
context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
    +            } else {
    +                // If we're starting from the beginning of all binlogs, 
the binlog filename must be the empty string (not null)
    +                currentBinlogFile = "";
    +            }
    +        }
    +
    +        // Set current binlog position to whatever is in State, falling 
back to the Retrieve All Records then Initial Binlog Filename, if no State 
variable is present
    +        String binlogPosition = 
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
    +        if (!StringUtils.isEmpty(binlogPosition)) {
    +            currentBinlogPosition = Long.valueOf(binlogPosition);
    +        } else if (!getAllRecords && 
context.getProperty(INIT_BINLOG_POSITION).isSet()) {
    +            currentBinlogPosition = 
context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
    +        }
    +
    +        // Get current sequence ID from state
    +        String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY);
    +        if (StringUtils.isEmpty(seqIdString)) {
    +            // Use Initial Sequence ID property if none is found in state
    +            PropertyValue seqIdProp = 
context.getProperty(GetChangeDataCaptureMySQL.INIT_SEQUENCE_ID);
    +            if (seqIdProp.isSet()) {
    +                
currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger());
    +            }
    +        } else {
    +            currentSequenceId.set(Integer.parseInt(seqIdString));
    +        }
    +
    +        // Get reference to Distributed Cache if one exists. If it does 
not, no enrichment (resolution of column names, e.g.) will be performed
    +        boolean createEnrichmentConnection = false;
    +        if 
(context.getProperty(GetChangeDataCaptureMySQL.DIST_CACHE_CLIENT).isSet()) {
    +            cacheClient = 
context.getProperty(GetChangeDataCaptureMySQL.DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class);
    +            createEnrichmentConnection = true;
    +        } else {
    +            logger.warn("No Distributed Map Cache Client is specified, so 
no event enrichment (resolution of column names, e.g.) will be performed.");
    +        }
    +
    +
    +        // Save off MySQL cluster and JDBC driver information, will be 
used to connect for event enrichment as well as for the binlog connector
    +        try {
    +            List<InetSocketAddress> hosts = 
getHosts(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue());
    +
    +            String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
    +            String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
    +
    +            long connectTimeout = 
context.getProperty(GetChangeDataCaptureMySQL.CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
    +
    +            String driverLocation = 
context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
    +            String driverName = 
context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
    +
    +            connect(hosts, username, password, createEnrichmentConnection, 
driverLocation, driverName, connectTimeout);
    +        } catch (IOException ioe) {
    +            context.yield();
    +            throw new ProcessException(ioe);
    +        }
    +
    +        PropertyValue dbNameValue = 
context.getProperty(DATABASE_NAME_PATTERN);
    +        databaseNamePattern = dbNameValue.isSet() ? 
Pattern.compile(dbNameValue.getValue()) : null;
    +
    +        PropertyValue tableNameValue = 
context.getProperty(TABLE_NAME_PATTERN);
    +        tableNamePattern = tableNameValue.isSet() ? 
Pattern.compile(tableNameValue.getValue()) : null;
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
    +        if (currentSession == null) {
    +            currentSession = sessionFactory.createSession();
    +        }
    +
    +        ComponentLog log = getLogger();
    +
    +        try {
    +            outputEvents(currentSession, log);
    +        } catch (IOException ioe) {
    +            try {
    +                // Perform some processor-level "rollback"
    +                currentBinlogFile = xactBinlogFile == null ? "" : 
xactBinlogFile;
    +                currentBinlogPosition = xactBinlogPosition;
    +                currentSequenceId.set(xactSequenceId);
    +                inTransaction = false;
    +                stop(context.getStateManager());
    +                queue.clear();
    +            } catch (Exception e) {
    +                // Not much we can recover from here
    +            }
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped(ProcessContext context) {
    +        try {
    +            stop(context.getStateManager());
    +        } catch (CDCException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    /**
    +     * Get a list of hosts from a NiFi property, e.g.
    +     *
    +     * @param hostsString A comma-separated list of hosts 
(host:port,host2:port2, etc.)
    +     * @return List of InetSocketAddresses for the hosts
    +     */
    +    private List<InetSocketAddress> getHosts(String hostsString) {
    +
    +        if (hostsString == null) {
    +            return null;
    +        }
    +        final List<String> hostsSplit = 
Arrays.asList(hostsString.split(","));
    +        List<InetSocketAddress> hostsList = new ArrayList<>();
    +
    +        for (String item : hostsSplit) {
    +            String[] addresses = item.split(":");
    +            if (addresses.length != 2) {
    +                throw new ArrayIndexOutOfBoundsException("Not in host:port 
format");
    +            }
    +
    +            hostsList.add(new InetSocketAddress(addresses[0].trim(), 
Integer.parseInt(addresses[1].trim())));
    +        }
    +        return hostsList;
    +    }
    +
    +    protected void connect(List<InetSocketAddress> hosts, String username, 
String password, boolean createEnrichmentConnection,
    +                           String driverLocation, String driverName, long 
connectTimeout) throws IOException {
    +
    +        int connectionAttempts = 0;
    +        final int numHosts = hosts.size();
    +        InetSocketAddress connectedHost = null;
    +        ComponentLog log = getLogger();
    +
    +        while (connectedHost == null && connectionAttempts < numHosts) {
    +            if (binlogClient == null) {
    +
    +                connectedHost = hosts.get(currentHost);
    +                binlogClient = 
createBinlogClient(connectedHost.getHostString(), connectedHost.getPort(), 
username, password);
    +            }
    +
    +            BinlogEventListener eventListener = 
createBinlogEventListener(binlogClient, queue);
    +            binlogClient.registerEventListener(eventListener);
    +
    +            binlogClient.setBinlogFilename(currentBinlogFile);
    +            binlogClient.setBinlogPosition(currentBinlogPosition);
    +
    +            try {
    +                if (connectTimeout == 0) {
    +                    connectTimeout = Long.MAX_VALUE;
    +                }
    +                binlogClient.connect(connectTimeout);
    +
    +            } catch (IOException | TimeoutException te) {
    +                // Try the next host
    +                connectedHost = null;
    +                currentHost = (currentHost + 1) % numHosts;
    +                connectionAttempts++;
    +            }
    +        }
    +        if (!binlogClient.isConnected()) {
    +            throw new IOException("Could not connect to any of the 
specified hosts");
    +        }
    +
    +        if (createEnrichmentConnection) {
    +            try {
    +                jdbcConnection = getJdbcConnection(driverLocation, 
driverName, connectedHost, username, password, null);
    +            } catch (InitializationException | SQLException e) {
    +                throw new ProcessException(e);
    +            }
    +        }
    +
    +        doStop.set(false);
    +    }
    +
    +
    +    public void outputEvents(ProcessSession session, ComponentLog log) 
throws IOException {
    +        RawBinlogEvent rawBinlogEvent;
    +
    +        // Drain the queue
    +        ;
    +        while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) {
    --- End diff --
    
    Will binlogClient.disconnect() call stop on the listeners? If so, and we're 
restarted it seems like the queue will maintain previous binlogevents, but 
shouldn't grow, correct? 


> Implement a GetChangeDataCapture processor
> ------------------------------------------
>
>                 Key: NIFI-3413
>                 URL: https://issues.apache.org/jira/browse/NIFI-3413
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> Database systems such as MySQL, Oracle, and SQL Server allow access to their 
> transactional logs and such, in order for external clients to have a "change 
> data capture" (CDC) capability. I propose a GetChangeDataCapture processor to 
> enable this in NiFi.
> The processor would be configured with a DBCPConnectionPool controller 
> service, as well as a Database Type property (similar to the one in 
> QueryDatabaseTable) for database-specific handling. Additional properties 
> might include the CDC table name, etc.  Additional database-specific 
> properties could be handled using dynamic properties (and the documentation 
> should reflect this).
> The processor would accept no incoming connections (it is a "Get" or source 
> processor), would be intended to run on the primary node only as a single 
> threaded processor, and would generate a flow file for each operation 
> (INSERT, UPDATE, DELETE, e,g,) in one or some number of formats (JSON, e.g.). 
> The flow files would be transferred in time order (to enable a replication 
> solution, for example), perhaps with some auto-incrementing attribute to also 
> indicate order if need be.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to