Repository: nifi Updated Branches: refs/heads/master 97461657b -> 24bb8cf95
NIFI-3735: Replaced 'Schema Change Event' references to 'DDL Event' This closes #1703. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/24bb8cf9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/24bb8cf9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/24bb8cf9 Branch: refs/heads/master Commit: 24bb8cf95d7228555215aadb1e46c62e1f0bff91 Parents: 9746165 Author: Matt Burgess <mattyb...@apache.org> Authored: Wed Apr 26 15:36:00 2017 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Thu Apr 27 08:26:33 2017 +0900 ---------------------------------------------------------------------- .../org/apache/nifi/cdc/event/EventInfo.java | 2 +- .../mysql/event/BaseBinlogTableEventInfo.java | 2 +- .../nifi/cdc/mysql/event/DDLEventInfo.java | 38 +++++++++++++++++ .../cdc/mysql/event/SchemaChangeEventInfo.java | 38 ----------------- .../nifi/cdc/mysql/event/io/DDLEventWriter.java | 43 ++++++++++++++++++++ .../mysql/event/io/SchemaChangeEventWriter.java | 43 -------------------- .../mysql/processors/CaptureChangeMySQL.java | 12 +++--- .../processors/CaptureChangeMySQLTest.groovy | 4 +- 8 files changed, 91 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java index 9ad8d0e..64500b9 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java @@ -28,7 +28,7 @@ public interface EventInfo { String INSERT_EVENT = "insert"; String DELETE_EVENT = "delete"; String UPDATE_EVENT = "update"; - String SCHEMA_CHANGE = "schema_change"; + String DDL_EVENT = "ddl"; String getEventType(); http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java index 763d695..c5d3ddf 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java @@ -32,7 +32,7 @@ public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements Bin public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) { super(eventType, timestamp, binlogFilename, binlogPosition); - this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, timestamp); + this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java new file mode 100644 index 0000000..bc2c871 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.TableEventInfo; +import org.apache.nifi.cdc.event.TableInfo; + + +/** + * An event class corresponding to Data Definition Language (DDL) events, such as schema changes (add/drop column, add/drop table, etc.) and others (truncate table, e.g.) + */ +public class DDLEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo { + + private String query; + + public DDLEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) { + super(tableInfo, DDL_EVENT, timestamp, binlogFilename, binlogPosition); + this.query = query; + } + + public String getQuery() { + return query; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java deleted file mode 100644 index a385b11..0000000 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cdc.mysql.event; - -import org.apache.nifi.cdc.event.TableEventInfo; -import org.apache.nifi.cdc.event.TableInfo; - - -/** - * An event class corresponding to table schema changes (add/drop column, add/drop table, etc.) - */ -public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo { - - private String query; - - public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) { - super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, binlogPosition); - this.query = query; - } - - public String getQuery() { - return query; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java new file mode 100644 index 0000000..0064c29 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.io; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; + +/** + * A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s). + */ +public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> { + + @Override + public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) { + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, (outputStream) -> { + super.startJson(outputStream, eventInfo); + super.writeJson(eventInfo); + jsonGenerator.writeStringField("query", eventInfo.getQuery()); + super.endJson(); + }); + flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); + session.transfer(flowFile, relationship); + session.getProvenanceReporter().receive(flowFile, transitUri); + return currentSequenceId + 1; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java deleted file mode 100644 index fe31c07..0000000 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cdc.mysql.event.io; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo; - -/** - * A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) events to flow file(s). - */ -public class SchemaChangeEventWriter extends AbstractBinlogTableEventWriter<SchemaChangeEventInfo> { - - @Override - public long writeEvent(ProcessSession session, String transitUri, SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship relationship) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, (outputStream) -> { - super.startJson(outputStream, eventInfo); - super.writeJson(eventInfo); - jsonGenerator.writeStringField("query", eventInfo.getQuery()); - super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); - session.transfer(flowFile, relationship); - session.getProvenanceReporter().receive(flowFile, transitUri); - return currentSequenceId + 1; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 688dff1..a8c3336 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -47,13 +47,13 @@ import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; import org.apache.nifi.cdc.mysql.event.RawBinlogEvent; -import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo; import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter; import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter; -import org.apache.nifi.cdc.mysql.event.io.SchemaChangeEventWriter; +import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter; import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -128,7 +128,7 @@ import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS; @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order " + "of the CDC event flow file relative to the other event flow file(s)."), @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) " - + "'begin', 'insert', 'update', 'delete', 'schema_change' and 'commit'."), + + "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."), @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to " + "application/json") }) @@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); - private final SchemaChangeEventWriter schemaChangeEventWriter = new SchemaChangeEventWriter(); + private final DDLEventWriter ddlEventWriter = new DDLEventWriter(); private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter(); private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter(); @@ -801,8 +801,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { || normalizedQuery.startsWith("drop database")) { if (includeDDLEvents) { - SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); - currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS)); + DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); + currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS)); } // Remove all the keys from the cache that this processor added if (cacheClient != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index f82028e..eb1f32b 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -403,7 +403,7 @@ class CaptureChangeMySQLTest { def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit' - + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit') + + 'begin' + 'ddl' + Collections.nCopies(2, 'delete') + 'commit') resultFiles.eachWithIndex {e, i -> assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) @@ -478,7 +478,7 @@ class CaptureChangeMySQLTest { testRunner.run(1, true, false) def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) - // No 'schema_change' events expected + // No DDL events expected List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit') resultFiles.eachWithIndex {e, i ->