Repository: nifi
Updated Branches:
  refs/heads/master 97461657b -> 24bb8cf95


NIFI-3735: Replaced 'Schema Change Event' references to 'DDL Event'

This closes #1703.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/24bb8cf9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/24bb8cf9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/24bb8cf9

Branch: refs/heads/master
Commit: 24bb8cf95d7228555215aadb1e46c62e1f0bff91
Parents: 9746165
Author: Matt Burgess <mattyb...@apache.org>
Authored: Wed Apr 26 15:36:00 2017 -0400
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Thu Apr 27 08:26:33 2017 +0900

----------------------------------------------------------------------
 .../org/apache/nifi/cdc/event/EventInfo.java    |  2 +-
 .../mysql/event/BaseBinlogTableEventInfo.java   |  2 +-
 .../nifi/cdc/mysql/event/DDLEventInfo.java      | 38 +++++++++++++++++
 .../cdc/mysql/event/SchemaChangeEventInfo.java  | 38 -----------------
 .../nifi/cdc/mysql/event/io/DDLEventWriter.java | 43 ++++++++++++++++++++
 .../mysql/event/io/SchemaChangeEventWriter.java | 43 --------------------
 .../mysql/processors/CaptureChangeMySQL.java    | 12 +++---
 .../processors/CaptureChangeMySQLTest.groovy    |  4 +-
 8 files changed, 91 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
index 9ad8d0e..64500b9 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
@@ -28,7 +28,7 @@ public interface EventInfo {
     String INSERT_EVENT = "insert";
     String DELETE_EVENT = "delete";
     String UPDATE_EVENT = "update";
-    String SCHEMA_CHANGE = "schema_change";
+    String DDL_EVENT = "ddl";
 
     String getEventType();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
index 763d695..c5d3ddf 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
@@ -32,7 +32,7 @@ public class BaseBinlogTableEventInfo extends 
BaseBinlogEventInfo implements Bin
 
     public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, 
Long timestamp, String binlogFilename, Long binlogPosition) {
         super(eventType, timestamp, binlogFilename, binlogPosition);
-        this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, 
timestamp);
+        this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, 
timestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
new file mode 100644
index 0000000..bc2c871
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.TableEventInfo;
+import org.apache.nifi.cdc.event.TableInfo;
+
+
+/**
+ * An event class corresponding to Data Definition Language (DDL) events, such 
as schema changes (add/drop column, add/drop table, etc.) and others (truncate 
table, e.g.)
+ */
+public class DDLEventInfo extends BaseBinlogTableEventInfo implements 
TableEventInfo {
+
+    private String query;
+
+    public DDLEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, long binlogPosition, String query) {
+        super(tableInfo, DDL_EVENT, timestamp, binlogFilename, binlogPosition);
+        this.query = query;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
deleted file mode 100644
index a385b11..0000000
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql.event;
-
-import org.apache.nifi.cdc.event.TableEventInfo;
-import org.apache.nifi.cdc.event.TableInfo;
-
-
-/**
- * An event class corresponding to table schema changes (add/drop column, 
add/drop table, etc.)
- */
-public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements 
TableEventInfo {
-
-    private String query;
-
-    public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, long binlogPosition, String query) {
-        super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, 
binlogPosition);
-        this.query = query;
-    }
-
-    public String getQuery() {
-        return query;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
new file mode 100644
index 0000000..0064c29
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+
+/**
+ * A writer class to output MySQL binlog Data Definition Language (DDL) events 
to flow file(s).
+ */
+public class DDLEventWriter extends 
AbstractBinlogTableEventWriter<DDLEventInfo> {
+
+    @Override
+    public long writeEvent(ProcessSession session, String transitUri, 
DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, (outputStream) -> {
+            super.startJson(outputStream, eventInfo);
+            super.writeJson(eventInfo);
+            jsonGenerator.writeStringField("query", eventInfo.getQuery());
+            super.endJson();
+        });
+        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().receive(flowFile, transitUri);
+        return currentSequenceId + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
deleted file mode 100644
index fe31c07..0000000
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql.event.io;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
-
-/**
- * A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) 
events to flow file(s).
- */
-public class SchemaChangeEventWriter extends 
AbstractBinlogTableEventWriter<SchemaChangeEventInfo> {
-
-    @Override
-    public long writeEvent(ProcessSession session, String transitUri, 
SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship 
relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
-            super.startJson(outputStream, eventInfo);
-            super.writeJson(eventInfo);
-            jsonGenerator.writeStringField("query", eventInfo.getQuery());
-            super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
-        return currentSequenceId + 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 688dff1..a8c3336 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -47,13 +47,13 @@ import 
org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
 import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
-import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
 import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
 import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
-import org.apache.nifi.cdc.mysql.event.io.SchemaChangeEventWriter;
+import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -128,7 +128,7 @@ import static 
com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
         @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description 
= "A sequence identifier (i.e. strictly increasing integer value) specifying 
the order "
                 + "of the CDC event flow file relative to the other event flow 
file(s)."),
         @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, 
description = "A string indicating the type of CDC event that occurred, 
including (but not limited to) "
-                + "'begin', 'insert', 'update', 'delete', 'schema_change' and 
'commit'."),
+                + "'begin', 'insert', 'update', 'delete', 'ddl' and 
'commit'."),
         @WritesAttribute(attribute = "mime.type", description = "The processor 
outputs flow file content in JSON format, and sets the mime.type attribute to "
                 + "application/json")
 })
@@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     private final BeginTransactionEventWriter beginEventWriter = new 
BeginTransactionEventWriter();
     private final CommitTransactionEventWriter commitEventWriter = new 
CommitTransactionEventWriter();
-    private final SchemaChangeEventWriter schemaChangeEventWriter = new 
SchemaChangeEventWriter();
+    private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
     private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
     private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
     private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
@@ -801,8 +801,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                                 || normalizedQuery.startsWith("drop 
database")) {
 
                             if (includeDDLEvents) {
-                                SchemaChangeEventInfo schemaChangeEvent = new 
SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, 
currentBinlogPosition, normalizedQuery);
-                                
currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, 
transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS));
+                                DDLEventInfo ddlEvent = new 
DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, 
normalizedQuery);
+                                
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, 
ddlEvent, currentSequenceId.get(), REL_SUCCESS));
                             }
                             // Remove all the keys from the cache that this 
processor added
                             if (cacheClient != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/24bb8cf9/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index f82028e..eb1f32b 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -403,7 +403,7 @@ class CaptureChangeMySQLTest {
 
         def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
         List<String> expectedEventTypes = ([] + 'begin' + 
Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit'
-                + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') 
+ 'commit')
+                + 'begin' + 'ddl' + Collections.nCopies(2, 'delete') + 
'commit')
 
         resultFiles.eachWithIndex {e, i ->
             assertEquals(i, 
Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
@@ -478,7 +478,7 @@ class CaptureChangeMySQLTest {
         testRunner.run(1, true, false)
 
         def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
-        // No 'schema_change' events expected
+        // No DDL events expected
         List<String> expectedEventTypes = ([] + 'begin' + 
Collections.nCopies(3, 'insert') + 'commit')
 
         resultFiles.eachWithIndex {e, i ->

Reply via email to