[
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372281#comment-15372281
]
ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/282#discussion_r70385310
--- Diff:
library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPollerTest.java ---
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link AbstractJdbcPollInputOperator} and
+ * {@link JdbcPollInputOperator}
+ */
+public class JdbcPollerTest
+{
+ public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ public static final String URL =
"jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+ private static final String TABLE_NAME = "test_account_table";
+ private static String APP_ID = "JdbcPollingOperatorTest";
+ public String dir = null;
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ cleanup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + " (Account_No INTEGER, Name VARCHAR(255), Amount INTEGER)";
+ stmt.executeUpdate(createTable);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ FileUtils.deleteDirectory(new File("target/" + APP_ID));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String cleanTable = "delete from " + TABLE_NAME;
+ stmt.executeUpdate(cleanTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void insertEventsInTable(int numEvents, int offset)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+ for (int i = 0; i < numEvents; i++, offset++) {
+ stmt.setInt(1, offset);
+ stmt.setString(2, "Account_Holder-" + offset);
+ stmt.setInt(3, (offset * 1000));
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Simulates actual application flow Adds a batch query partitiom, a
pollable
+ * partition Incremental record polling is also checked
+ */
+ @Test
+ public void testJdbcPollingInputOperatorBatch() throws
InterruptedException
+ {
+ cleanTable();
+ insertEventsInTable(10, 0);
+ JdbcStore store = new JdbcStore();
+ store.setDatabaseDriver(DB_DRIVER);
+ store.setDatabaseUrl(URL);
+
+ Attribute.AttributeMap.DefaultAttributeMap attributeMap = new
Attribute.AttributeMap.DefaultAttributeMap();
+ this.dir = "target/" + APP_ID + "/";
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ attributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+ JdbcPollInputOperator inputOperator = new JdbcPollInputOperator();
+ inputOperator.setStore(store);
+ inputOperator.setBatchSize(100);
+ inputOperator.setPollInterval(1000);
+ inputOperator.setEmitColumnList("Account_No,Name,Amount");
+ inputOperator.setKey("Account_No");
+ inputOperator.setTableName(TABLE_NAME);
+ inputOperator.setFetchSize(100);
+ inputOperator.setPartitionCount(1);
+
+ CollectorTestSink<Object> sink = new CollectorTestSink<>();
+ inputOperator.outputPort.setSink(sink);
+
+ TestUtils.MockBatchedOperatorStats readerStats = new
TestUtils.MockBatchedOperatorStats(2);
+
+ DefaultPartition<AbstractJdbcPollInputOperator<Object>> apartition =
new DefaultPartition<AbstractJdbcPollInputOperator<Object>>(
+ inputOperator);
+
+ TestUtils.MockPartition<AbstractJdbcPollInputOperator<Object>>
pseudoParttion = new TestUtils.MockPartition<>(
+ apartition, readerStats);
+
+ List<Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
newMocks = Lists.newArrayList();
+
+ newMocks.add(pseudoParttion);
+
+
Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
newPartitions = inputOperator
+ .definePartitions(newMocks, null);
+
+
Iterator<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>>
itr = newPartitions
+ .iterator();
+
+ int operatorId = 0;
+ for
(com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<Object>>
partition : newPartitions) {
+
+ Attribute.AttributeMap.DefaultAttributeMap partitionAttributeMap =
new Attribute.AttributeMap.DefaultAttributeMap();
+ this.dir = "target/" + APP_ID + "/";
+ partitionAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ partitionAttributeMap.put(Context.DAGContext.APPLICATION_PATH, dir);
+
+ OperatorContextTestHelper.TestIdOperatorContext partitioningContext
= new OperatorContextTestHelper.TestIdOperatorContext(
+ operatorId++, partitionAttributeMap);
+
+ partition.getPartitionedInstance().setup(partitioningContext);
+ partition.getPartitionedInstance().activate(partitioningContext);
+ }
+
+ //First partition is for range queries,last is for polling queries
+ AbstractJdbcPollInputOperator<Object> newInstance =
itr.next().getPartitionedInstance();
+ CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
+ newInstance.outputPort.setSink(sink1);
+ newInstance.beginWindow(1);
+ Thread.sleep(50);
+ newInstance.emitTuples();
+ newInstance.endWindow();
+
+ Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+ int i = 0;
+ for (Object tuple : sink1.collectedTuples) {
+ String[] pojoEvent = tuple.toString().split(",");
+ Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ?
true : false);
+ i++;
+ }
+ sink1.collectedTuples.clear();
+
+ insertEventsInTable(10, 10);
+
+ AbstractJdbcPollInputOperator<Object> pollableInstance =
itr.next().getPartitionedInstance();
+
+ pollableInstance.outputPort.setSink(sink1);
+
+ pollableInstance.beginWindow(1);
+ Thread.sleep(700);
+ pollableInstance.endWindow();
+ pollableInstance.emitTuples();
+
+ Assert.assertEquals("rows from db", 10, sink1.collectedTuples.size());
+ i = 10;
+ for (Object tuple : sink1.collectedTuples) {
+ String[] pojoEvent = tuple.toString().split(",");
+ Assert.assertTrue("i=" + i, Integer.parseInt(pojoEvent[0]) == i ?
true : false);
+ i++;
+ }
+
+ sink1.collectedTuples.clear();
+
+ insertEventsInTable(10, 20);
+
+ pollableInstance.beginWindow(2);
+ Thread.sleep(700);
+ pollableInstance.endWindow();
+ pollableInstance.emitTuples();
--- End diff --
Same here.
> Add jdbc poller input operator
> ------------------------------
>
> Key: APEXMALHAR-2066
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Ashwin Chandra Putta
> Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be
> given.
> 3.This operator uses static partitioning to arrive at range queries for
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This
> (lowerBound,upperBoundPair) is then used for recovery.The queries are
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL
> table. This class would emit range queries based on a primary index given.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)