[
https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15255972#comment-15255972
]
ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------
Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60868038
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java ---
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and
implements BackendLoaders interface.</p> <br/>
+ * <p>
+ * Properties:<br>
+ * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
+ * <b>tableName</b>: JDBC table name<br>
+ * <br>
+ */
[email protected]
+public class JDBCLoader extends JdbcStore implements BackendLoader
+{
+ protected String queryStmt;
+
+ protected String tableName;
+
+ protected transient List<FieldInfo> includeFieldInfo;
+ protected transient List<FieldInfo> lookupFieldInfo;
+
+ protected Object getQueryResult(Object key)
+ {
+ try {
+ PreparedStatement getStatement =
getConnection().prepareStatement(queryStmt);
+ ArrayList<Object> keys = (ArrayList<Object>)key;
+ for (int i = 0; i < keys.size(); i++) {
+ getStatement.setObject(i + 1, keys.get(i));
+ }
+ return getStatement.executeQuery();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected ArrayList<Object> getDataFrmResult(Object result) throws
RuntimeException
+ {
+ try {
+ ResultSet resultSet = (ResultSet)result;
+ if (resultSet.next()) {
+ ResultSetMetaData rsdata = resultSet.getMetaData();
+ // If the includefields is empty, populate it from
ResultSetMetaData
+ if (CollectionUtils.isEmpty(includeFieldInfo)) {
+ if (includeFieldInfo == null) {
+ includeFieldInfo = new ArrayList<>();
+ }
+ for (int i = 1; i <= rsdata.getColumnCount(); i++) {
+ String columnName = rsdata.getColumnName(i);
+ // TODO: Take care of type conversion.
+ includeFieldInfo.add(new FieldInfo(columnName, columnName,
FieldInfo.SupportType.OBJECT));
+ }
+ }
+
+ ArrayList<Object> res = new ArrayList<Object>();
+ for (FieldInfo f : includeFieldInfo) {
+ res.add(getConvertedData(resultSet.getObject(f.getColumnName()),
f));
+ }
+ return res;
+ } else {
+ return null;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Object getConvertedData(Object object, FieldInfo f)
+ {
+ if (f.getType().getJavaType() == object.getClass()) {
+ return object;
+ } else {
+ logger.warn("Type mismatch seen for field {}, returning as it is",
f.getColumnName());
+ return null;
+ }
+ }
+
+ private String generateQueryStmt()
+ {
+ String stmt = "select * from " + tableName + " where ";
+ boolean first = true;
+ for (FieldInfo fieldInfo : lookupFieldInfo) {
+ if (first) {
+ first = false;
+ } else {
+ stmt += " and ";
+ }
+ stmt += fieldInfo.getColumnName() + " = ?";
+ }
+
+ logger.info("generateQueryStmt: {}", stmt);
+ return stmt;
+ }
+
+ public String getQueryStmt()
+ {
+ return queryStmt;
+ }
+
+ /**
+ * Set the sql Prepared Statement if the enrichment mechanism is query
based.
+ */
+ public void setQueryStmt(String queryStmt)
+ {
+ this.queryStmt = queryStmt;
+ }
+
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * Set the table name.
+ */
+ public void setTableName(String tableName)
+ {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void setFieldInfo(List<FieldInfo> lookupFieldInfo,
List<FieldInfo> includeFieldInfo)
+ {
+ this.lookupFieldInfo = lookupFieldInfo;
+ this.includeFieldInfo = includeFieldInfo;
+ if (queryStmt == null) {
+ queryStmt = generateQueryStmt();
+ }
+ }
+
+ @Override
+ public Map<Object, Object> loadInitialData()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(Object key)
+ {
+ return getDataFrmResult(getQueryResult(key));
+ }
+
+ @Override
+ public List<Object> getAll(List<Object> keys)
+ {
+ List<Object> values = Lists.newArrayList();
+ for (Object key : keys) {
+ values.add(get(key));
+ }
+ return values;
+ }
+
+ @Override
+ public void put(Object key, Object value)
+ {
+ throw new RuntimeException("Not supported operation");
--- End diff --
UnsupportedOperationException ?
> Adding Enrichment Operator to Malhar
> ------------------------------------
>
> Key: APEXMALHAR-2023
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Components: adapters database
> Affects Versions: 3.3.1
> Reporter: Chinmay Kolhatkar
> Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)