[ 
https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353228#comment-16353228
 ] 

ASF GitHub Bot commented on NIFI-4521:
--------------------------------------

Github user patricker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2231#discussion_r166169705
  
    --- Diff: 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java
 ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.cdc.mssql.processors;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.cdc.CDCException;
    +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
    +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
    +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.ResultSetRecordSet;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "jdbc", "cdc", "mssql"})
    +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a 
Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. 
Events "
    +        + "for each table are output as Record Sets, ordered by the time, 
and sequence, at which the operation occurred.")
    +@Stateful(scopes = Scope.CLUSTER, description = "Information including the 
timestamp of the last CDC event per table in the database is stored by this 
processor, so "
    +        + "that it can continue from the same point in time if restarted.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "tablename", description="Name of the 
table this changeset was captured from."),
    +        @WritesAttribute(attribute="mssqlcdc.row.count", description="The 
number of rows in this changeset"),
    +        @WritesAttribute(attribute="fullsnapshot", description="Whether 
this was a full snapshot of the base table or not..")})
    +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression 
Language", supportsExpressionLanguage = false, description = "Specifies an 
initial "
    +        + "timestamp for reading CDC data from MS SQL. Properties should 
be added in the format `initial.timestamp.{table_name}`, one for each table. "
    +        + "This property is ignored after the first successful run for a 
table writes to the state manager, and is only used again if state is cleared.")
    +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
    +    public static final String INITIAL_TIMESTAMP_PROP_START = 
"initial.timestamp.";
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for 
writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-dbcp-service")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain 
connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CDC_TABLES = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-cdc-table-list")
    +            .displayName("CDC Table List")
    +            .description("The comma delimited list of tables in the source 
database to monitor for changes. If no tables "
    +                    + "are specified the [cdc].[change_tables] table is 
queried for all of the available tables with change tracking enabled in the 
database.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-initial-snapshot")
    +            .displayName("Generate an Initial Source Table Snapshot")
    +            .description("Usually CDC only includes recent historic 
changes. Setting this property to true will cause a snapshot of the "
    +                + "source table to be taken using the same schema as the 
CDC extracts. The snapshot time will be used as the starting point "
    +                + "for extracting CDC changes.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new 
PropertyDescriptor
    +            .Builder().name("cdcmssql-full-snapshot-row-limit")
    +            .displayName("Change Set Row Limit")
    +            .description("If a very large change occurs on the source 
table, "
    +                    + "the generated change set may be too large too 
quickly merge into a destination system. "
    +                    + "Use this property to set a cut-off point where 
instead of returning a changeset a full snapshot will be generated instead. "
    +                    + "The fullsnapshot attribute will be set to true when 
this happens.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    Settled on "changeset". Will update.


> MS SQL CDC Processor
> --------------------
>
>                 Key: NIFI-4521
>                 URL: https://issues.apache.org/jira/browse/NIFI-4521
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Peter Wicks
>            Assignee: Peter Wicks
>            Priority: Major
>
> Creation of a new processor that reads Change Data Capture details from 
> Microsoft SQL Server and outputs the changes a Records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to