exceptionfactory commented on a change in pull request #5710: URL: https://github.com/apache/nifi/pull/5710#discussion_r795073834
########## File path: nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cdc.postgresql.processors; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.postgresql.event.Reader; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; + +/** + * A processor to Capture Data Change (CDC) events from a PostgreSQL database + * and return them as flow files. + */ +@TriggerSerially +@PrimaryNodeOnly +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) + +@Tags({ "sql", "jdbc", "cdc", "postgresql" }) + +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. Works for PostgreSQL version 10+. " + + "Events include INSERT, UPDATE, and DELETE operations and are output as individual flow files ordered by the time at which the operation occurred. " + + "This processor uses Replication Connection to stream data. By default, an existing Logical Replication Slot with the specified name will be used or, " + + "if none exists, a new one will be created. In the case of an existing slot, make sure that pgoutput is the output plugin. " + + "This processor also uses SQL Connection to query system views. " + + "Furthermore, a Publication in PostgreSQL database should already exist.") + +@Stateful(scopes = Scope.CLUSTER, description = "The last received Log Sequence Number (LSN) from Replication Slot is stored by this processor, " + + "such that it can continue from the same location if restarted.") + +@WritesAttributes({ + @WritesAttribute(attribute = "cdc.type", description = "The CDC event type, as begin, commit, insert, update, delete, etc."), + @WritesAttribute(attribute = "cdc.lsn", description = "The Log Sequence Number (i.e. strictly increasing integer value) specifying the order " + + "of the CDC event flow file relative to the other event flow files."), + @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to application/json.") }) + +public class CaptureChangePostgreSQL extends AbstractProcessor { + // Constants + private static final String MIME_TYPE_DEFAULT = "application/json"; + private static final String LAST_LSN_STATE_MAP_KEY = "last.received.lsn"; + private static final String POSTGRESQL_MIN_VERSION = "10"; + private static final String POSTGRESQL_PORT_DEFAULT = "5432"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully created FlowFile from CDC event.").build(); + + private Set<Relationship> relationships; + + // Properties + public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-driver-class") + .displayName("PostgreSQL Driver Class Name") + .description("The class name of the PostgreSQL database driver class") + .defaultValue("org.postgresql.Driver") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder() + .name("cdc-postgresql-driver-locations") + .displayName("PostgreSQL Driver Location(s)") + .description( + "Comma-separated list of files/folders containing the PostgreSQL driver JAR and its dependencies (if any). For example '/var/tmp/postgresql-42.3.1.jar'") + .defaultValue(null) + .required(false) + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("cdc-postgresql-host") + .displayName("PostgreSQL Hostname") + .description("The hostname for PostgreSQL connections.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("cdc-postgresql-port") + .displayName("PostgreSQL Port") + .description("The default port for PostgreSQL connections.") + .required(true) + .defaultValue(POSTGRESQL_PORT_DEFAULT) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .name("cdc-postgresql-database") + .displayName("Database") + .description("Specifies the name of the database to connect to.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-user") + .displayName("Username") + .description("Username to access the PostgreSQL database. Must be a superuser.") + .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("cdc-postgresql-password") + .displayName("Password") + .description("Password to access the PostgreSQL database.") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("cdc-postgresql-max-wait-time").displayName("Max Wait Time") + .description( + "The maximum amount of time allowed for a connection to be established, zero means there is effectively no limit.") + .defaultValue("30 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder() + .name("cdc-postgresql-publication") + .displayName("Publication Name") + .description( + "A group of tables whose data changes are intended to be replicated through Logical Replication. " + + "It should be created in the database before the processor starts.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor REPLICATION_SLOT = new PropertyDescriptor.Builder() + .name("cdc-postgresql-slot-name") + .displayName("Replication Slot Name") + .description( + "A unique, cluster-wide identifier for the PostgreSQL Replication Slot. " + + "If it already exists, make sure that pgoutput is the output plugin.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DROP_SLOT_IF_EXISTS = new PropertyDescriptor.Builder() + .name("cdc-postgresql-drop-slot-if-exists") + .displayName("Drop if exists replication slot?") + .description( + "Drop the Replication Slot in PostgreSQL database if it already exists every processor starts.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor INCLUDE_BEGIN_COMMIT = new PropertyDescriptor.Builder() + .name("cdc-postgresql-include-begin-commit") + .displayName("Include BEGIN and COMMIT statements?") + .description( + "Specifies whether to emit events including BEGIN and COMMIT statements. Set to true if the BEGIN and COMMIT statements are necessary in the downstream flow, " + + "otherwise set to false, which suppresses these statements and can increase flow performance.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor INCLUDE_ALL_METADATA = new PropertyDescriptor.Builder() + .name("cdc-postgresql-include-all-metadata") + .displayName("Include all metadata?") + .description( + "Specifies whether to emit events including all message types (BEGIN, COMMIT, RELATION, TYPE, etc) and all metadata (relation id, tuple type, tuple data before update, etc). " + + "Set to true if all metadata are necessary in the downstream flow, otherwise set to false, which suppresses these metadata and can increase flow performance.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor START_LSN = new PropertyDescriptor.Builder() + .name("cdc-postgresql-start-lsn") + .displayName("Start Log Sequence Number (LSN)") + .description( + "Specifies a start Log Sequence Number (LSN) to use if this processor's state does not have a current " + + "sequence identifier. If a LSN is present in the processor's state, this property is ignored.") + .required(false) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .build(); + + // Key Attributes + private List<PropertyDescriptor> descriptors; + private JDBCConnectionHolder queryConnHolder = null; + private JDBCConnectionHolder replicationConnHolder = null; + private volatile Long lastLSN = null; + protected Reader replicationReader = null; + protected Long maxFlowFileListSize = 100L; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(DRIVER_NAME); + descriptors.add(DRIVER_LOCATION); + descriptors.add(HOST); + descriptors.add(PORT); + descriptors.add(DATABASE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECTION_TIMEOUT); + descriptors.add(PUBLICATION); + descriptors.add(REPLICATION_SLOT); + descriptors.add(DROP_SLOT_IF_EXISTS); + descriptors.add(INCLUDE_BEGIN_COMMIT); + descriptors.add(INCLUDE_ALL_METADATA); + descriptors.add(START_LSN); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + // Runs the initial configuration of processor. + public void setup(ProcessContext context) { + final ComponentLog logger = getLogger(); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + logger.error( + "Failed to retrieve observed maximum values from the State Manager. Will not attempt connection until this is accomplished.", + ioe); + context.yield(); + return; + } + + final String driverName = context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue(); + final String driverLocation = context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue(); + + final String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue(); + final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); + final String database = context.getProperty(DATABASE).evaluateAttributeExpressions().getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + final Long connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions() + .asTimePeriod(TimeUnit.MILLISECONDS); + + final String publication = context.getProperty(PUBLICATION).evaluateAttributeExpressions().getValue(); + final String slot = context.getProperty(REPLICATION_SLOT).evaluateAttributeExpressions().getValue(); + final Boolean dropSlotIfExists = context.getProperty(DROP_SLOT_IF_EXISTS).asBoolean(); + final Boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean(); + final Boolean includeAllMetadata = context.getProperty(INCLUDE_ALL_METADATA).asBoolean(); + final Long startLSN = context.getProperty(START_LSN).asLong(); + + if (stateMap.get(LAST_LSN_STATE_MAP_KEY) != null && Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)) > 0L) { + this.lastLSN = Long.parseLong(stateMap.get(LAST_LSN_STATE_MAP_KEY)); + } else { + if (startLSN != null) + this.lastLSN = startLSN; + } + + try { + this.connect(host, port, database, username, password, driverLocation, driverName, connectionTimeout); + this.createReplicationReader(slot, dropSlotIfExists, publication, lastLSN, includeBeginCommit, + includeAllMetadata, this.getReplicationConnection(), this.getQueryConnection()); + + } catch (final IOException | TimeoutException | SQLException | RuntimeException e) { + context.yield(); + this.closeQueryConnection(); + this.closeReplicationConnection(); + throw new ProcessException(e.getMessage(), e); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // final ComponentLog logger = getLogger(); + + if (this.replicationReader == null || this.replicationReader.getReplicationStream() == null) { + setup(context); + } + + try { + List<FlowFile> listFlowFiles = new ArrayList<>(); + + while (listFlowFiles.size() < this.getMaxFlowFileListSize()) { + HashMap<String, Object> message = this.replicationReader.readMessage(); + + if (message == null) // No more messages. + break; + + if (message.isEmpty()) // Skip empty messages. + continue; + + String data = this.replicationReader.convertMessageToJSON(message); + + FlowFile flowFile = null; + flowFile = session.create(); + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(data.getBytes(StandardCharsets.UTF_8)); + } + }); + + flowFile = session.putAttribute(flowFile, "cdc.type", message.get("type").toString()); + + // Some messagens don't have LSN (e.g. Relation). + if (message.containsKey("lsn")) // Relation messagens don't have LSN. + flowFile = session.putAttribute(flowFile, "cdc.lsn", message.get("lsn").toString()); + + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_DEFAULT); + + listFlowFiles.add(flowFile); + } + + session.transfer(listFlowFiles, REL_SUCCESS); + + this.lastLSN = this.replicationReader.getLastReceiveLSN(); + + // Feedback is sent after the flowfiles transfer completes. + this.replicationReader.sendFeedback(this.lastLSN); + + updateState(context.getStateManager()); + + } catch (SQLException | IOException e) { + context.yield(); + this.closeQueryConnection(); + this.closeReplicationConnection(); + throw new ProcessException(e.getMessage(), e); + } + } + + // Update last LSN received on processor state. + private void updateState(StateManager stateManager) throws IOException { + if (stateManager != null) { + Map<String, String> newStateMap = new HashMap<>(stateManager.getState(Scope.CLUSTER).toMap()); + + newStateMap.put(LAST_LSN_STATE_MAP_KEY, String.valueOf(this.lastLSN)); + stateManager.setState(newStateMap, Scope.CLUSTER); + } + } + + protected void stop(StateManager stateManager) throws CDCException { + try { + this.replicationReader = null; + + if (lastLSN != null) + updateState(stateManager); + + if (queryConnHolder != null) + queryConnHolder.close(); + + if (replicationConnHolder != null) + replicationConnHolder.close(); + + } catch (Exception e) { + throw new CDCException("Error closing CDC connections!", e); + } + } + + @OnStopped + public void onStopped(ProcessContext context) { + try { + stop(context.getStateManager()); + } catch (CDCException ioe) { + throw new ProcessException(ioe); + } + } + + @OnShutdown + public void onShutdown(ProcessContext context) { + try { + // In case we get shutdown while still running, save off the current state, + // disconnect, and shut down gracefully. + stop(context.getStateManager()); + } catch (CDCException ioe) { + throw new ProcessException(ioe); + } + } + + protected void createReplicationReader(String slot, boolean dropSlotIfExists, String publication, Long lsn, + boolean includeBeginCommit, boolean includeAllMetadata, Connection replicationConn, + Connection queryConn) throws SQLException { + this.replicationReader = new Reader(slot, dropSlotIfExists, publication, lsn, includeBeginCommit, + includeAllMetadata, replicationConn, queryConn); + } + + protected Long getMaxFlowFileListSize() { + return this.maxFlowFileListSize; + } + + protected void connect(String host, String port, String database, String username, String password, + String driverLocation, String driverName, long connectionTimeout) + throws IOException, TimeoutException { + try { + // Ensure driverLocation and driverName are correct + // before establishing connection. + registerDriver(driverLocation, driverName); + } catch (InitializationException e) { + throw new RuntimeException( + "Failed to register JDBC driver. Ensure PostgreSQL Driver Location(s) and PostgreSQL Driver Class Name " + + "are configured correctly. " + e, + e); + } + + // Connection expects a non-null password. + if (password == null) { + password = ""; + } + + // Connection expects a timeout. + if (connectionTimeout == 0) { + connectionTimeout = Long.MAX_VALUE; + } + + InetSocketAddress address = getAddress(host, port); + + queryConnHolder = new JDBCConnectionHolder(address, database, username, password, false, connectionTimeout); + try { + // Ensure Query connection can be created. + this.getQueryConnection(); + } catch (SQLException e) { + throw new IOException("Error creating SQL connection to specified host and port", e); + } + + replicationConnHolder = new JDBCConnectionHolder(address, database, username, password, true, + connectionTimeout); + try { + // Ensure Replication connection can be created. + this.getReplicationConnection(); + } catch (SQLException e) { + throw new IOException("Error creating Replication connection to specified host and port", e); + } + } + + protected Connection getQueryConnection() throws SQLException { + return queryConnHolder.getConnection(); + } + + protected Connection getReplicationConnection() throws SQLException { + return replicationConnHolder.getConnection(); + } + + protected void closeQueryConnection() { + if (queryConnHolder != null) + queryConnHolder.close(); + } + + protected void closeReplicationConnection() { + if (replicationConnHolder != null) + replicationConnHolder.close(); Review comment: All blocks should include standard brackets: ```suggestion if (replicationConnHolder != null) { replicationConnHolder.close(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org