[
https://issues.apache.org/jira/browse/APEXMALHAR-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15321677#comment-15321677
]
ASF GitHub Bot commented on APEXMALHAR-2066:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/282#discussion_r66363085
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * A concrete implementation for {@link AbstractJdbcPollInputOperator}} for
+ * consuming data from MySQL using JDBC interface <br>
+ * User needs to provide tableName,dbConnection,setEmitColumnList,look-up
key
+ * <br>
+ * Optionally batchSize,pollInterval,Look-up key and a where clause can be
given
+ * <br>
+ * This operator uses static partitioning to arrive at range queries for
exactly
+ * once reads<br>
+ * Assumption is that there is an ordered column using which range queries
can
+ * be formed<br>
+ * If an emitColumnList is provided, please ensure that the keyColumn is
the
+ * first column in the list<br>
+ * Range queries are formed using the {@link JdbcMetaDataUtility}} Output -
+ * comma separated list of the emit columns eg columnA,columnB,columnC
+ *
+ * @displayName Jdbc Polling Input Operator
+ * @category Input
+ * @tags database, sql, jdbc
+ */
+public class JdbcPollInputOperator extends
AbstractJdbcPollInputOperator<Object>
+{
+ private long lastBatchWindowId;
+ private transient long currentWindowId;
+ private long lastCreationTsMillis;
+ private long fetchBackMillis = 0L;
+ private ArrayList<String> emitColumns;
+ private transient int count = 0;
+
+ /**
+ * Returns the emit columns
+ */
+ public List<String> getEmitColumns()
+ {
+ return emitColumns;
+ }
+
+ /**
+ * Sets the emit columns
+ */
+ public void setEmitColumns(ArrayList<String> emitColumns)
+ {
+ this.emitColumns = emitColumns;
+ }
+
+ /**
+ * Returns fetchBackMilis
+ */
+ public long getFetchBackMillis()
+ {
+ return fetchBackMillis;
+ }
+
+ /**
+ * Sets fetchBackMilis - used in polling
+ */
+ public void setFetchBackMillis(long fetchBackMillis)
+ {
+ this.fetchBackMillis = fetchBackMillis;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ parseEmitColumnList(getEmitColumnList());
+ lastCreationTsMillis = System.currentTimeMillis() - fetchBackMillis;
+ }
+
+ private void parseEmitColumnList(String columnList)
+ {
+ String[] cols = columnList.split(",");
+ ArrayList<String> arr = Lists.newArrayList();
+ for (int i = 0; i < cols.length; i++) {
+ arr.add(cols[i]);
+ }
+ setEmitColumns(arr);
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ super.beginWindow(l);
+ currentWindowId = l;
+ }
+
+ @Override
+ protected void pollRecords(PreparedStatement ps)
+ {
+ ResultSet rs = null;
+ List<Object> metaList = new ArrayList<>();
+
+ if (isReplayed) {
+ return;
+ }
+
+ try {
+ if (ps.isClosed()) {
+ LOG.debug("Returning due to closed ps for non-pollable
partitions");
+ return;
+ }
+ } catch (SQLException e) {
+ LOG.error("Prepared statement is closed", e);
+ throw new RuntimeException(e);
+ }
+
+ try (PreparedStatement pStat = ps;) {
+ pStat.setFetchSize(getFetchSize());
+ LOG.debug("sql query = {}", pStat);
+ rs = pStat.executeQuery();
+ boolean hasNext = false;
+
+ if (rs == null || rs.isClosed()) {
+ return;
+ }
+
+ while ((hasNext = rs.next())) {
+ Object key = null;
+ StringBuilder resultTuple = new StringBuilder();
+ try {
+ if (count < getBatchSize()) {
+ key = rs.getObject(getKey());
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
+ }
+ metaList.add(resultTuple.substring(0, resultTuple.length() -
1));
+ count++;
+ } else {
+ emitQueue.add(metaList);
+ metaList = new ArrayList<>();
+ key = rs.getObject(getKey());
+ for (String obj : emitColumns) {
+ resultTuple.append(rs.getObject(obj) + ",");
+ }
+ metaList.add(resultTuple.substring(0, resultTuple.length() -
1));
+ count = 0;
+ }
+ } catch (NullPointerException npe) {
+ LOG.error("Key not found" + npe);
+ throw new RuntimeException(npe);
+ }
+ if (isPollable) {
+ highestPolled = key.toString();
+ isPolled = true;
+ }
+ }
+ /*Flush the remaining records once the result set is over and
batch-size is not reached,
+ * Dont flush if its pollable*/
+ if (!hasNext) {
+ if ((isPollable && isPolled) || !isPollable) {
+ emitQueue.add(metaList);
--- End diff --
This might throw an exception if the queue is full. Use offer() instead?
> Add jdbc poller input operator
> ------------------------------
>
> Key: APEXMALHAR-2066
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2066
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Ashwin Chandra Putta
> Assignee: devendra tagare
>
> Create a JDBC poller input operator that has the following features.
> 1. poll from external jdbc store asynchronously in the input operator.
> 2. polling frequency and batch size should be configurable.
> 3. should be idempotent.
> 4. should be partition-able.
> 5. should be batch + polling capable.
> Assumptions for idempotency & partitioning,
> 1.User needs to provide tableName,dbConnection,setEmitColumnList,look-up key.
> 2.Optionally batchSize,pollInterval,Look-up key and a where clause can be
> given.
> 3.This operator uses static partitioning to arrive at range queries for
> exactly once reads
> 4.Assumption is that there is an ordered column using which range queries can
> be formed<br>
> 5.If an emitColumnList is provided, please ensure that the keyColumn is the
> first column in the list
> 6.Range queries are formed using the JdbcMetaDataUtility Output - comma
> separated list of the emit columns eg columnA,columnB,columnC
> Per window the first and the last key processed is saved using the
> FSWindowDataManager - (<lowerBound,UpperBound>,operatorId,windowId).This
> (lowerBound,upperBoundPair) is then used for recovery.The queries are
> constructed using the JDBCMetaDataUtility.
> JDBCMetaDataUtility
> A utility class used to retrieve the metadata for a given unique key of a SQL
> table. This class would emit range queries based on a primary index given.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)