Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213476734
  
    --- Diff: 
pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java 
---
    @@ -0,0 +1,364 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.text.ParseException;
    +import java.text.SimpleDateFormat;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables resolver
    + */
    +public class JdbcResolver extends JdbcPlugin implements ReadResolver, 
WriteResolver {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcResolver(InputData input) throws UserDataException {
    +        super(input);
    +    }
    +
    +    /**
    +     * getFields() implementation
    +     *
    +     * @throws SQLException if the provided {@link OneRow} object is 
invalid
    +     */
    +    @Override
    +    public List<OneField> getFields(OneRow row) throws SQLException {
    +        ResultSet result = (ResultSet) row.getData();
    +        LinkedList<OneField> fields = new LinkedList<>();
    +
    +        for (ColumnDescriptor column : columns) {
    +            String colName = column.columnName();
    +            Object value = null;
    +
    +            OneField oneField = new OneField();
    +            oneField.type = column.columnTypeCode();
    +
    +            switch (DataType.get(oneField.type)) {
    +                case INTEGER:
    +                    value = result.getInt(colName);
    +                    break;
    +                case FLOAT8:
    +                    value = result.getDouble(colName);
    +                    break;
    +                case REAL:
    +                    value = result.getFloat(colName);
    +                    break;
    +                case BIGINT:
    +                    value = result.getLong(colName);
    +                    break;
    +                case SMALLINT:
    +                    value = result.getShort(colName);
    +                    break;
    +                case BOOLEAN:
    +                    value = result.getBoolean(colName);
    +                    break;
    +                case BYTEA:
    +                    value = result.getBytes(colName);
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case NUMERIC:
    +                    value = result.getString(colName);
    +                    break;
    +                case DATE:
    +                    value = result.getDate(colName);
    +                    break;
    +                case TIMESTAMP:
    +                    value = result.getTimestamp(colName);
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" 
+ DataType.get(oneField.type).toString() + "' (column '" + column.toString() + 
"') is not supported");
    +            }
    +
    +            oneField.val = value;
    +            fields.add(oneField);
    +        }
    +        return fields;
    +    }
    +
    +    /**
    +     * setFields() implementation
    +     *
    +     * @return OneRow with the data field containing a List<OneField>
    +     * OneFields are not reordered before being passed to Accessor; at the
    +     * moment, there is no way to correct the order of the fields if it is 
not.
    +     * In practice, the 'record' provided is always ordered the right way.
    +     *
    +     * @throws UnsupportedOperationException if field of some type is not 
supported
    +     */
    +    @Override
    +    public OneRow setFields(List<OneField> record) throws 
UnsupportedOperationException, ParseException {
    +        int column_index = 0;
    +        for (OneField oneField : record) {
    +            ColumnDescriptor column = columns.get(column_index);
    +            if (
    +                LOG.isDebugEnabled() &&
    +                DataType.get(column.columnTypeCode()) != 
DataType.get(oneField.type)
    +            ) {
    +                LOG.warn("The provided tuple of data may be disordered. 
Datatype of column with descriptor '" + column.toString() + "' must be '" + 
DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + 
DataType.get(oneField.type).toString() + "'");
    +            }
    +
    +            // Check that data type is supported
    +            switch (DataType.get(oneField.type)) {
    +                case BOOLEAN:
    +                case INTEGER:
    +                case FLOAT8:
    +                case REAL:
    +                case BIGINT:
    +                case SMALLINT:
    +                case NUMERIC:
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                case BYTEA:
    +                case TIMESTAMP:
    +                case DATE:
    +                    break;
    +                default:
    +                    throw new UnsupportedOperationException("Field type '" 
+ DataType.get(oneField.type).toString() + "' (column '" + column.toString() + 
"') is not supported");
    +            }
    +
    +            if (LOG.isDebugEnabled()) {
    +                if (DataType.get(oneField.type) == DataType.BYTEA) {
    +                    LOG.debug("OneField content (conversion from BYTEA): 
'" + new String((byte[])oneField.val) + "'");
    +                }
    +            }
    +
    +            // Convert TEXT columns into native data types
    +            if ((oneField.val != null) && (DataType.get(oneField.type) == 
DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) {
    +                String rawVal = (String)oneField.val;
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("OneField content (conversion from TEXT): '" 
+ rawVal + "'");
    +                }
    +                switch (DataType.get(column.columnTypeCode())) {
    +                    case VARCHAR:
    +                    case BPCHAR:
    +                    case TEXT:
    +                    case BYTEA:
    +                        break;
    +                    case BOOLEAN:
    +                        oneField.val = 
(Object)Boolean.parseBoolean(rawVal);
    +                        break;
    +                    case INTEGER:
    +                        oneField.val = (Object)Integer.parseInt(rawVal);
    +                        break;
    +                    case FLOAT8:
    +                        oneField.val = (Object)Double.parseDouble(rawVal);
    +                        break;
    +                    case REAL:
    +                        oneField.val = (Object)Float.parseFloat(rawVal);
    +                        break;
    +                    case BIGINT:
    +                        oneField.val = (Object)Long.parseLong(rawVal);
    +                        break;
    +                    case SMALLINT:
    +                        oneField.val = (Object)Short.parseShort(rawVal);
    +                        break;
    +                    case NUMERIC:
    +                        oneField.val = (Object)new BigDecimal(rawVal);
    +                        break;
    +                    case TIMESTAMP:
    +                        boolean isConversionSuccessful = false;
    +                        for (SimpleDateFormat sdf : timestampSDFs.get()) {
    +                            try {
    +                                java.util.Date parsedTimestamp = 
sdf.parse(rawVal);
    +                                oneField.val = (Object)new 
Timestamp(parsedTimestamp.getTime());
    +                                isConversionSuccessful = true;
    +                                break;
    +                            }
    +                            catch (ParseException e) {
    +                                // pass
    +                            }
    +                        }
    +                        if (!isConversionSuccessful) {
    +                            throw new ParseException(rawVal, 0);
    +                        }
    +                        break;
    +                    case DATE:
    +                        oneField.val = (Object)new 
Date(dateSDF.get().parse(rawVal).getTime());
    +                        break;
    +                    default:
    +                        throw new UnsupportedOperationException("Field 
type '" + DataType.get(oneField.type).toString() + "' (column '" + 
column.toString() + "') is not supported");
    +                }
    +                oneField.type = column.columnTypeCode();
    +            }
    +
    +            column_index += 1;
    +        }
    +        return new OneRow(new LinkedList<OneField>(record));
    +    }
    +
    +    /**
    +     * Decode OneRow object and pass all its contents to a 
PreparedStatement
    +     *
    +     * @throws IOException if data in a OneRow is corrupted
    +     * @throws SQLException if the given statement is broken
    +     */
    +    @SuppressWarnings("unchecked")
    +    public static void decodeOneRowToPreparedStatement(OneRow row, 
PreparedStatement statement) throws IOException, SQLException {
    +        // This is safe: OneRow comes from JdbcResolver
    +        List<OneField> tuple = (List<OneField>)row.getData();
    +        for (int i = 1; i <= tuple.size(); i++) {
    +            OneField field = tuple.get(i - 1);
    +            switch (DataType.get(field.type)) {
    +                case INTEGER:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setInt(i, (int)field.val);
    +                    }
    +                    break;
    +                case BIGINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setLong(i, (long)field.val);
    +                    }
    +                    break;
    +                case SMALLINT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.INTEGER);
    +                    }
    +                    else {
    +                        statement.setShort(i, (short)field.val);
    +                    }
    +                    break;
    +                case REAL:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.FLOAT);
    +                    }
    +                    else {
    +                        statement.setFloat(i, (float)field.val);
    +                    }
    +                    break;
    +                case FLOAT8:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DOUBLE);
    +                    }
    +                    else {
    +                        statement.setDouble(i, (double)field.val);
    +                    }
    +                    break;
    +                case BOOLEAN:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BOOLEAN);
    +                    }
    +                    else {
    +                        statement.setBoolean(i, (boolean)field.val);
    +                    }
    +                    break;
    +                case NUMERIC:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.NUMERIC);
    +                    }
    +                    else {
    +                        statement.setBigDecimal(i, (BigDecimal)field.val);
    +                    }
    +                    break;
    +                case VARCHAR:
    +                case BPCHAR:
    +                case TEXT:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.VARCHAR);
    +                    }
    +                    else {
    +                        statement.setString(i, (String)field.val);
    +                    }
    +                    break;
    +                case BYTEA:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.BINARY);
    +                    }
    +                    else {
    +                        statement.setBytes(i, (byte[])field.val);
    +                    }
    +                    break;
    +                case TIMESTAMP:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.TIMESTAMP);
    +                    }
    +                    else {
    +                        statement.setTimestamp(i, (Timestamp)field.val);
    +                    }
    +                    break;
    +                case DATE:
    +                    if (field.val == null) {
    +                        statement.setNull(i, Types.DATE);
    +                    }
    +                    else {
    +                        statement.setDate(i, (Date)field.val);
    +                    }
    +                    break;
    +                default:
    +                    throw new IOException("The data tuple from 
JdbcResolver is corrupted");
    +            }
    +        }
    +    }
    +
    +    private static final Log LOG = LogFactory.getLog(JdbcResolver.class);
    +
    +    // SimpleDateFormat to parse TEXT into DATE
    +    private static ThreadLocal<SimpleDateFormat> dateSDF = new 
ThreadLocal<SimpleDateFormat>() {
    --- End diff --
    
    how are these threadlocals are getting cleaned ? if this logic executes on 
Tomcat thread pool, those threads are created and discarded as load changes, so 
when a thread is discarded, the thread local needs to be cleaned, which is the 
usual problem with threadlocals. Maybe use some thread-safe library for 
formatting, like joda-time ?


---

Reply via email to