[ https://issues.apache.org/jira/browse/NIFI-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941190#comment-15941190 ]
ASF GitHub Bot commented on NIFI-3413: -------------------------------------- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi/pull/1618#discussion_r107920928 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetChangeDataCaptureMySQL.java --- @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import org.apache.nifi.annotation.behavior.DynamicProperties; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.db.CDCException; +import org.apache.nifi.processors.standard.db.event.ColumnDefinition; +import org.apache.nifi.processors.standard.db.event.RowEventException; +import org.apache.nifi.processors.standard.db.event.TableInfo; +import org.apache.nifi.processors.standard.db.event.TableInfoCacheKey; +import org.apache.nifi.processors.standard.db.event.io.EventWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.BeginTransactionEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.RawBinlogEvent; +import org.apache.nifi.processors.standard.db.impl.mysql.BinlogEventListener; +import org.apache.nifi.processors.standard.db.impl.mysql.event.BinlogEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.CommitTransactionEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.DeleteRowsEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.SchemaChangeEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.UpdateRowsEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.InsertRowsEventInfo; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.BeginTransactionEventWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.CommitTransactionEventWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.DeleteRowsWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.InsertRowsWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.SchemaChangeEventWriter; +import org.apache.nifi.processors.standard.db.impl.mysql.event.io.UpdateRowsWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +/** + * A processor to retrieve Change Data Capture (CDC) events and send them as flow files. + */ +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "jdbc", "cdc", "mysql"}) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "are output as individual flow files ordered by the time at which the operation occurred.") +@Stateful(scopes = Scope.LOCAL, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such " + + "that it can continue from the same location if restarted.") +@DynamicProperties({ + @DynamicProperty( + name = "init.binlog.filename", + value = "The binlog filename to start from", + description = "Specifies the name of the binlog file from which to begin retrieving CDC records." + ), + @DynamicProperty( + name = "init.binlog.position", + value = "The offset into the initial binlog file to start from", + description = "Specifies the offset into the initial binlog file from which to begin retrieving CDC records." + ) +}) +@WritesAttributes({ + @WritesAttribute(attribute = "cdc.sequence.id", description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order " + + "of the CDC event flow file relative to the other event flow file(s)."), + @WritesAttribute(attribute = "cdc.event.type", description = "A string indicating the type of CDC event that occurred, including (but not limited to) " + + "'begin', 'write', 'update', 'delete', 'schema_change' and 'commit'."), + @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to " + + "application/json") +}) +public class GetChangeDataCaptureMySQL extends AbstractSessionFactoryProcessor { + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected static Set<Relationship> relationships; + + // Properties + public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-db-name-pattern") + .displayName("Database/Schema Name Pattern") + .description("A regular expression (regex) for matching databases or schemas (depending on your RDBMS' terminology) against the list of CDC events. The regex must match " + + "the schema name as it is stored in the database. If the property is not set, the schema name will not be used to filter the CDC events.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-name-pattern") + .displayName("Table Name Pattern") + .description("A regular expression (regex) for matching CDC events affecting matching tables. The regex must match the table name as it is stored in the database. " + + "If the property is not set, no events will be filtered based on table name.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-max-wait-time") + .displayName("Max Wait Time") + .description("The maximum amount of time allowed for a connection to be established, " + + "zero means there is effectively no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-hosts") + .displayName("MySQL Hosts") + .description("A list of hostname/port entries corresponding to nodes in a MySQL cluster. The entries should be comma separated " + + "using a colon such as host1:port,host2:port,.... For example mysql.myhost.com:3306. This processor will attempt to connect to " + + "the hosts in the list in order. If one node goes down and failover is enabled for the cluster, then the processor will connect " + + "to the active node (assuming its host entry is specified in this property. The default port for MySQL connections is 3306.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-driver-class") + .displayName("MySQL Driver Class Name") + .description("The class name of the MySQL database driver class") + .defaultValue("com.mysql.jdbc.Driver") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-driver-locations") + .displayName("MySQL Driver Location(s)") + .description("Comma-separated list of files/folders and/or URLs containing the MySQL driver JAR and its dependencies (if any). " + + "For example '/var/tmp/mysql-connector-java-5.1.38-bin.jar'") + .defaultValue(null) + .required(false) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator())) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-username") + .displayName("Username") + .description("Username to access the MySQL cluster") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-password") + .displayName("Password") + .description("Password to access the MySQL cluster") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-dist-map-cache-client") + .displayName("Distributed Map Cache Client") + .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various tables, columns, etc. " + + "needed by the processor. If a client is not specified, the generated events will not include column type or name information.") + .identifiesControllerService(DistributedMapCacheClient.class) + .required(false) + .build(); + + public static final PropertyDescriptor RETRIEVE_ALL_RECORDS = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-retrieve-all-records") + .displayName("Retrieve All Records") + .description("Specifies whether to get all available CDC events, regardless of the current binlog filename and/or position. If this property is set to true, " + + "any init.binlog.filename and init.binlog.position dynamic properties are ignored. NOTE: If binlog filename and position values are present in the processor's " + + "State Map, this property's value is ignored. To reset the behavior, clear the processor state (refer to the State Management section of the processor's documentation.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor INIT_SEQUENCE_ID = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-init-seq-id") + .displayName("Initial Sequence ID") + .description("Specifies an initial sequence identifier to use if this processor's State does not have a current " + + "sequence identifier. If a sequence identifier is present in the processor's State, this property is ignored. Sequence identifiers are " + + "monotonically increasing integers that record the order of flow files generated by the processor. They can be used with the EnforceOrder " + + "processor to guarantee ordered delivery of CDC events.") + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor INIT_BINLOG_FILENAME = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-init-binlog-filename") + .displayName("Initial Binlog Filename") + .description("Specifies an initial binlog filename to use if this processor's State does not have a current binlog filename. If a filename is present " + + "in the processor's State, this property is ignored. This can be used along with Initial Binlog Position to \"skip ahead\" if previous events are not desired. " + + "Note that NiFi Expression Language is supported, but this property is evaluated when the processor is configured, so FlowFile attributes may not be used. Expression " + + "Language is supported to enable the use of the Variable Registry and/or environment properties.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor INIT_BINLOG_POSITION = new PropertyDescriptor.Builder() + .name("get-cdc-mysql-init-binlog-position") + .displayName("Initial Binlog Position") + .description("Specifies an initial offset into a binlog (specified by Initial Binlog Filename) to use if this processor's State does not have a current " + + "binlog filename. If a filename is present in the processor's State, this property is ignored. This can be used along with Initial Binlog Filename " + + "to \"skip ahead\" if previous events are not desired. Note that NiFi Expression Language is supported, but this property is evaluated when the " + + "processor is configured, so FlowFile attributes may not be used. Expression Language is supported to enable the use of the Variable Registry " + + "and/or environment properties.") + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + private static List<PropertyDescriptor> propDescriptors; + + private volatile ProcessSession currentSession; + private BinaryLogClient binlogClient; + private volatile LinkedBlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(); + private volatile String currentBinlogFile = null; + private volatile long currentBinlogPosition = 4; + + // The following variables save the value of the binlog filename and position (and sequence id) at the beginning of a transaction. Used for rollback + private volatile String xactBinlogFile = null; + private volatile long xactBinlogPosition = 4; + private volatile long xactSequenceId = 0; + + private volatile TableInfo currentTable = null; + private volatile Pattern databaseNamePattern; + private volatile Pattern tableNamePattern; + + private volatile boolean inTransaction = false; + private volatile boolean skipTable = false; + private AtomicBoolean doStop = new AtomicBoolean(false); + + private int currentHost = 0; + + private AtomicLong currentSequenceId = new AtomicLong(0); + + private volatile DistributedMapCacheClient cacheClient = null; + private final Serializer<TableInfoCacheKey> cacheKeySerializer = new TableInfoCacheKey.Serializer(); + private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer(); + private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer(); + + private Connection jdbcConnection = null; + + private static final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); + private static final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); + private static final SchemaChangeEventWriter schemaChangeEventWriter = new SchemaChangeEventWriter(); + private static final InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); + private static final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter(); + private static final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter(); + + static { + + final Set<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + final List<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(HOSTS); + pds.add(DRIVER_NAME); + pds.add(DRIVER_LOCATION); + pds.add(USERNAME); + pds.add(PASSWORD); + pds.add(DATABASE_NAME_PATTERN); + pds.add(TABLE_NAME_PATTERN); + pds.add(CONNECT_TIMEOUT); + pds.add(DIST_CACHE_CLIENT); + pds.add(RETRIEVE_ALL_RECORDS); + pds.add(INIT_SEQUENCE_ID); + propDescriptors = Collections.unmodifiableList(pds); + } + + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if (INIT_BINLOG_FILENAME.getName().equals(propertyDescriptorName)) { + return INIT_BINLOG_FILENAME; + } + if (INIT_BINLOG_POSITION.getName().equals(propertyDescriptorName)) { + return INIT_BINLOG_POSITION; + } + return null; + } + + @OnScheduled + public void setup(ProcessContext context) { + + final ComponentLog logger = getLogger(); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.LOCAL); + } catch (final IOException ioe) { + logger.error("Failed to retrieve observed maximum values from the State Manager. Will not attempt " + + "connection until this is accomplished.", ioe); + context.yield(); + return; + } + + Map<String, String> statePropertiesMap = new HashMap<>(stateMap.toMap()); + + // Pre-store initial sequence ID if none has been set in the state map + final PropertyValue initSequenceId = context.getProperty(INIT_SEQUENCE_ID); + statePropertiesMap.computeIfAbsent(EventWriter.SEQUENCE_ID_KEY, k -> initSequenceId.isSet() ? initSequenceId.evaluateAttributeExpressions().getValue() : "0"); + + + boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean(); + + // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename, if no State variable is present + currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY); + if (StringUtils.isEmpty(currentBinlogFile)) { + if (!getAllRecords && context.getProperty(INIT_BINLOG_FILENAME).isSet()) { + currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue(); + } else { + // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null) + currentBinlogFile = ""; + } + } + + // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename, if no State variable is present + String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY); + if (!StringUtils.isEmpty(binlogPosition)) { + currentBinlogPosition = Long.valueOf(binlogPosition); + } else if (!getAllRecords && context.getProperty(INIT_BINLOG_POSITION).isSet()) { + currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong(); + } + + // Get current sequence ID from state + String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY); + if (StringUtils.isEmpty(seqIdString)) { + // Use Initial Sequence ID property if none is found in state + PropertyValue seqIdProp = context.getProperty(GetChangeDataCaptureMySQL.INIT_SEQUENCE_ID); + if (seqIdProp.isSet()) { + currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger()); + } + } else { + currentSequenceId.set(Integer.parseInt(seqIdString)); + } + + // Get reference to Distributed Cache if one exists. If it does not, no enrichment (resolution of column names, e.g.) will be performed + boolean createEnrichmentConnection = false; + if (context.getProperty(GetChangeDataCaptureMySQL.DIST_CACHE_CLIENT).isSet()) { + cacheClient = context.getProperty(GetChangeDataCaptureMySQL.DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class); + createEnrichmentConnection = true; + } else { + logger.warn("No Distributed Map Cache Client is specified, so no event enrichment (resolution of column names, e.g.) will be performed."); + } + + + // Save off MySQL cluster and JDBC driver information, will be used to connect for event enrichment as well as for the binlog connector + try { + List<InetSocketAddress> hosts = getHosts(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue()); + + String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + + long connectTimeout = context.getProperty(GetChangeDataCaptureMySQL.CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + + String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue(); + String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue(); + + connect(hosts, username, password, createEnrichmentConnection, driverLocation, driverName, connectTimeout); + } catch (IOException ioe) { + context.yield(); + throw new ProcessException(ioe); + } + + PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN); + databaseNamePattern = dbNameValue.isSet() ? Pattern.compile(dbNameValue.getValue()) : null; + + PropertyValue tableNameValue = context.getProperty(TABLE_NAME_PATTERN); + tableNamePattern = tableNameValue.isSet() ? Pattern.compile(tableNameValue.getValue()) : null; + } + + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + if (currentSession == null) { + currentSession = sessionFactory.createSession(); + } + + ComponentLog log = getLogger(); + + try { + outputEvents(currentSession, log); + } catch (IOException ioe) { + try { + // Perform some processor-level "rollback" + currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile; + currentBinlogPosition = xactBinlogPosition; + currentSequenceId.set(xactSequenceId); + inTransaction = false; + stop(context.getStateManager()); + queue.clear(); + } catch (Exception e) { + // Not much we can recover from here + } + throw new ProcessException(ioe); + } + } + + @OnStopped + public void onStopped(ProcessContext context) { + try { + stop(context.getStateManager()); + } catch (CDCException ioe) { + throw new ProcessException(ioe); + } + } + + /** + * Get a list of hosts from a NiFi property, e.g. + * + * @param hostsString A comma-separated list of hosts (host:port,host2:port2, etc.) + * @return List of InetSocketAddresses for the hosts + */ + private List<InetSocketAddress> getHosts(String hostsString) { + + if (hostsString == null) { + return null; + } + final List<String> hostsSplit = Arrays.asList(hostsString.split(",")); + List<InetSocketAddress> hostsList = new ArrayList<>(); + + for (String item : hostsSplit) { + String[] addresses = item.split(":"); + if (addresses.length != 2) { + throw new ArrayIndexOutOfBoundsException("Not in host:port format"); + } + + hostsList.add(new InetSocketAddress(addresses[0].trim(), Integer.parseInt(addresses[1].trim()))); + } + return hostsList; + } + + protected void connect(List<InetSocketAddress> hosts, String username, String password, boolean createEnrichmentConnection, + String driverLocation, String driverName, long connectTimeout) throws IOException { + + int connectionAttempts = 0; + final int numHosts = hosts.size(); + InetSocketAddress connectedHost = null; + ComponentLog log = getLogger(); + + while (connectedHost == null && connectionAttempts < numHosts) { + if (binlogClient == null) { + + connectedHost = hosts.get(currentHost); + binlogClient = createBinlogClient(connectedHost.getHostString(), connectedHost.getPort(), username, password); + } + + BinlogEventListener eventListener = createBinlogEventListener(binlogClient, queue); + binlogClient.registerEventListener(eventListener); + + binlogClient.setBinlogFilename(currentBinlogFile); + binlogClient.setBinlogPosition(currentBinlogPosition); + + try { + if (connectTimeout == 0) { + connectTimeout = Long.MAX_VALUE; + } + binlogClient.connect(connectTimeout); + + } catch (IOException | TimeoutException te) { + // Try the next host + connectedHost = null; + currentHost = (currentHost + 1) % numHosts; + connectionAttempts++; + } + } + if (!binlogClient.isConnected()) { + throw new IOException("Could not connect to any of the specified hosts"); + } + + if (createEnrichmentConnection) { + try { + jdbcConnection = getJdbcConnection(driverLocation, driverName, connectedHost, username, password, null); + } catch (InitializationException | SQLException e) { + throw new ProcessException(e); + } + } + + doStop.set(false); + } + + + public void outputEvents(ProcessSession session, ComponentLog log) throws IOException { + RawBinlogEvent rawBinlogEvent; + + // Drain the queue + ; + while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) { --- End diff -- Will binlogClient.disconnect() call stop on the listeners? If so, and we're restarted it seems like the queue will maintain previous binlogevents, but shouldn't grow, correct? > Implement a GetChangeDataCapture processor > ------------------------------------------ > > Key: NIFI-3413 > URL: https://issues.apache.org/jira/browse/NIFI-3413 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > Database systems such as MySQL, Oracle, and SQL Server allow access to their > transactional logs and such, in order for external clients to have a "change > data capture" (CDC) capability. I propose a GetChangeDataCapture processor to > enable this in NiFi. > The processor would be configured with a DBCPConnectionPool controller > service, as well as a Database Type property (similar to the one in > QueryDatabaseTable) for database-specific handling. Additional properties > might include the CDC table name, etc. Additional database-specific > properties could be handled using dynamic properties (and the documentation > should reflect this). > The processor would accept no incoming connections (it is a "Get" or source > processor), would be intended to run on the primary node only as a single > threaded processor, and would generate a flow file for each operation > (INSERT, UPDATE, DELETE, e,g,) in one or some number of formats (JSON, e.g.). > The flow files would be transferred in time order (to enable a replication > solution, for example), perhaps with some auto-incrementing attribute to also > indicate order if need be. -- This message was sent by Atlassian JIRA (v6.3.15#6346)