[
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321686#comment-15321686
]
ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/282#discussion_r66364498
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcMetaDataUtility.java ---
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * A utility class used to retrieve the metadata for a given unique key of
a SQL
+ * table. This class would emit range queries based on a primary index
given
+ *
+ * @Input - dbName,tableName, primaryKey
+ * @Output - map<operatorId,prepared statement>
+ *
+ */
+public class JdbcMetaDataUtility
+{
+ private static String DB_DRIVER = "com.mysql.jdbc.Driver";
+ private static String DB_CONNECTION = "";
+ private static String DB_USER = "";
+ private static String DB_PASSWORD = "";
+ private static String TABLE_NAME = "";
+ private static String KEY_COLUMN = "";
+ private static String WHERE_CLAUSE = null;
+ private static String COLUMN_LIST = null;
+
+ private static Logger LOG =
LoggerFactory.getLogger(JdbcMetaDataUtility.class);
+
+ public JdbcMetaDataUtility()
+ {
+
+ }
+
+ public JdbcMetaDataUtility(String dbConnection, String tableName, String
key, String userName, String password)
+ {
+ DB_CONNECTION = dbConnection;
+ DB_USER = userName;
+ DB_PASSWORD = password;
+ TABLE_NAME = tableName;
+ KEY_COLUMN = key;
+ }
+
+ private static Connection getDBConnection()
+ {
+
+ Connection dbConnection = null;
+
+ try {
+ Class.forName(DB_DRIVER);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Driver not found", e);
+ }
+
+ try {
+ dbConnection = DriverManager.getConnection(DB_CONNECTION, DB_USER,
DB_PASSWORD);
+ return dbConnection;
+ } catch (SQLException e) {
+ LOG.error("Exception in getting connection handle", e);
+ }
+
+ return dbConnection;
+
+ }
+
+ private static String generateQueryString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT COUNT(*) as RowCount from " + TABLE_NAME);
+
+ if (WHERE_CLAUSE != null) {
+ sb.append(" WHERE " + WHERE_CLAUSE);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Finds the total number of rows in the table
+ */
+ private static long getRecordRange(String query) throws SQLException
+ {
+ long rowCount = 0;
+ Connection dbConnection = null;
+ PreparedStatement preparedStatement = null;
+
+ try {
+ dbConnection = getDBConnection();
+ preparedStatement = dbConnection.prepareStatement(query);
+
+ ResultSet rs = preparedStatement.executeQuery();
+
+ while (rs.next()) {
+ rowCount = Long.parseLong(rs.getString("RowCount"));
--- End diff --
Suggest to use index instead of a column alias. Also seems like this is a
simple count utility. Can you rename it to something like "getTupleCount()" ?
> Add jdbc poller input operator
> ------------------------------
>
> Key: APEXMALHAR-2066
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Ashwin Chandra Putta
> Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be
> given.
> 3.This operator uses static partitioning to arrive at range queries for
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This
> (lowerBound,upperBoundPair) is then used for recovery.The queries are
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL
> table. This class would emit range queries based on a primary index given.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)