[
https://issues.apache.org/jira/browse/APEXMALHAR-1953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213891#comment-15213891
]
ASF GitHub Bot commented on APEXMALHAR-1953:
--------------------------------------------
Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/215#discussion_r57549460
--- Diff:
library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---
@@ -332,7 +400,154 @@ public void testJdbcPojoOutputOperator()
}
outputOperator.endWindow();
- Assert.assertEquals("rows in db", 10,
outputOperator.getNumOfEventsInStore());
+ Assert.assertEquals("rows in db", 10,
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+ }
+
+ /**
+ * This test will assume direct mapping for POJO fields to DB columns
+ */
+ @Test
+ public void testJdbcPojoInsertOutputOperator()
+ {
+ JdbcTransactionalStore transactionalStore = new
JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap
attributeMap =
+ new
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new
OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+ outputOperator.setBatchSize(3);
+ outputOperator.setTablename(TABLE_POJO_NAME);
+
+ outputOperator.setStore(transactionalStore);
+
+ outputOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new
Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS,
TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ outputOperator.input.setup(tpc);
+
+ outputOperator.activate(context);
+
+ List<TestPOJOEvent> events = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ events.add(new TestPOJOEvent(i, "test" + i));
+ }
+
+ outputOperator.beginWindow(0);
+ for (TestPOJOEvent event : events) {
+ outputOperator.input.process(event);
+ }
+ outputOperator.endWindow();
+
+ Assert.assertEquals("rows in db", 10,
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+ }
+
+ /**
+ * This test will assume direct mapping for POJO fields to DB columns
+ */
+ @Test
+ public void testJdbcPojoInsertOutputOperatorNullName()
+ {
+ JdbcTransactionalStore transactionalStore = new
JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap
attributeMap =
+ new
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new
OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+ outputOperator.setBatchSize(3);
+ outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF);
+
+ outputOperator.setStore(transactionalStore);
+
+ outputOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new
Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS,
TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ outputOperator.input.setup(tpc);
+
+ outputOperator.activate(context);
+
+ List<TestPOJOEvent> events = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ events.add(new TestPOJOEvent(i, "test" + i));
+ }
+
+ outputOperator.beginWindow(0);
+ for (TestPOJOEvent event : events) {
+ outputOperator.input.process(event);
+ }
+ outputOperator.endWindow();
+
+ Assert.assertEquals("rows in db", 10,
outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+ Assert.assertEquals("null name rows in db", 10,
+
outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+ }
+
+ @Test
+ public void testJdbcPojoOutputOperatorMerge()
+ {
+ JdbcTransactionalStore transactionalStore = new
JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap
attributeMap =
+ new
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new
OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJONonInsertOutputOperator updateOperator = new
TestPOJONonInsertOutputOperator();
+ updateOperator.setBatchSize(3);
+
+ updateOperator.setStore(transactionalStore);
+
+ updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS
T USING (VALUES (?, ?)) AS FOO(id, name) "
+ + "ON T.id = FOO.id "
+ + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
+ + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id,
FOO.name);");
+
+ List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new JdbcFieldInfo("id", "id", null, "INTEGER"));
+ fieldInfos.add(new JdbcFieldInfo("name", "name", null, "VARCHAR"));
+ updateOperator.setFieldInfos(fieldInfos);
+ updateOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new
Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS,
TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ updateOperator.input.setup(tpc);
+
+ updateOperator.activate(context);
+
+ List<TestPOJOEvent> events = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ events.add(new TestPOJOEvent(i, "test" + i));
+ }
+ for (int i = 0; i < 5; i++) {
+ events.add(new TestPOJOEvent(i, "test" + 100));
+ }
+
+ updateOperator.getDistinctNonUnique();
+ updateOperator.beginWindow(0);
+ for (TestPOJOEvent event : events) {
+ updateOperator.input.process(event);
+ }
+ updateOperator.endWindow();
+
+ Assert.assertEquals("rows in db", 10,
updateOperator.getNumOfEventsInStore());
--- End diff --
Add comment on how this is verified.
> Add generic (insert, update, delete) support to JDBC Output Operator
> --------------------------------------------------------------------
>
> Key: APEXMALHAR-1953
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1953
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Bhupesh Chawda
> Assignee: Bhupesh Chawda
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)