Github user leskin-in commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1353#discussion_r183218994 --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java --- @@ -0,0 +1,469 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; + +import java.util.List; +import java.io.IOException; +import java.text.ParseException; +import java.math.BigDecimal; +import java.sql.Types; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLTimeoutException; +import java.sql.Statement; +import java.sql.Timestamp; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * JDBC tables accessor + * + * The SELECT queries are processed by {@link java.sql.Statement} + * + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and + * built-in JDBC batches of arbitrary size + */ +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor { + /** + * Class constructor + */ + public JdbcAccessor(InputData inputData) throws UserDataException { + super(inputData); + } + + /** + * openForRead() implementation + * Create query, open JDBC connection, execute query and store the result into resultSet + * + * @throws SQLException if a database access error occurs + * @throws SQLTimeoutException if a problem with the connection occurs + * @throws ParseException if th SQL statement provided in PXF InputData is incorrect + * @throws ClassNotFoundException if the superclass implementation disappeared + */ + @Override + public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException { + if (statementRead != null && !statementRead.isClosed()) { + return true; + } + + super.openConnection(); + + queryRead = buildSelectQuery(); + statementRead = dbConn.createStatement(); + resultSetRead = statementRead.executeQuery(queryRead); + + return true; + } + + /** + * readNextObject() implementation + * Retreive the next tuple from resultSet and return it + * + * @throws SQLException if a problem in resultSet occurs + */ + @Override + public OneRow readNextObject() throws SQLException { + if (resultSetRead.next()) { + return new OneRow(resultSetRead); + } + return null; + } + + /** + * closeForRead() implementation + * + * @throws SQLException if a database access error occurs + */ + @Override + public void closeForRead() throws SQLException { + if (statementRead != null && !statementRead.isClosed()) { + statementRead.close(); + statementRead = null; + } + super.closeConnection(); + } + + /** + * openForWrite() implementation + * Create query template and open JDBC connection + * + * @throws SQLException if a database access error occurs + * @throws SQLTimeoutException if a problem with the connection occurs + * @throws ParseException if the SQL statement provided in PXF InputData is incorrect + * @throws ClassNotFoundException if the superclass implementation has disappeared + */ + @Override + public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException, ClassNotFoundException { + if (statementWrite != null && !statementWrite.isClosed()) { + return true; + } + + super.openConnection(); + if (dbMeta.supportsTransactions()) { + dbConn.setAutoCommit(false); + } + + queryWrite = buildInsertQuery(); + statementWrite = dbConn.prepareStatement(queryWrite); + + if ((batchSize != 0) && (!dbMeta.supportsBatchUpdates())) { + LOG.info( + "The database '" + + dbMeta.getDatabaseProductName() + + "' does not support batch updates. The current request will be handled without batching" + ); + batchSize = 0; + } + + return true; + } + + /** + * writeNextObject() implementation + * + * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite + * Otherwise, execute an INSERT query immediately + * + * In both cases, a {@link java.sql.PreparedStatement} is used + * + * @throws SQLException if a database access error occurs + * @throws IOException if the data provided by {@link JdbcResolver} is corrupted + */ + @Override + @SuppressWarnings("unchecked") + public boolean writeNextObject(OneRow row) throws SQLException, IOException { + // This cast is safe because the data in the row is formed by JdbcPlugin + List<OneField> tuple = (List<OneField>) row.getData(); + + if (LOG.isDebugEnabled()) { + LOG.debug("writeNextObject() called"); + } + + for (int i = 1; i <= tuple.size(); i++) { + OneField field = tuple.get(i - 1); + if (LOG.isDebugEnabled()) { + LOG.debug("Field " + i + ": " + DataType.get(field.type).toString()); + } + switch (DataType.get(field.type)) { + case INTEGER: + if (field.val == null) { + statementWrite.setNull(i, Types.INTEGER); + } + else { + statementWrite.setInt(i, (int)field.val); + } + break; + case BIGINT: + if (field.val == null) { + statementWrite.setNull(i, Types.INTEGER); + } + else { + statementWrite.setLong(i, (long)field.val); + } + break; + case SMALLINT: + if (field.val == null) { + statementWrite.setNull(i, Types.INTEGER); + } + else { + statementWrite.setShort(i, (short)field.val); + } + break; + case REAL: + if (field.val == null) { + statementWrite.setNull(i, Types.FLOAT); + } + else { + statementWrite.setFloat(i, (float)field.val); + } + break; + case FLOAT8: + if (field.val == null) { + statementWrite.setNull(i, Types.DOUBLE); + } + else { + statementWrite.setDouble(i, (double)field.val); + } + break; + case BOOLEAN: + if (field.val == null) { + statementWrite.setNull(i, Types.BOOLEAN); + } + else { + statementWrite.setBoolean(i, (boolean)field.val); + } + break; + case NUMERIC: + if (field.val == null) { + statementWrite.setNull(i, Types.NUMERIC); + } + else { + statementWrite.setBigDecimal(i, (BigDecimal)field.val); + } + break; + case VARCHAR: + case BPCHAR: + case TEXT: + if (field.val == null) { + statementWrite.setNull(i, Types.VARCHAR); + } + else { + statementWrite.setString(i, (String)field.val); + } + break; + case BYTEA: + if (field.val == null) { + statementWrite.setNull(i, Types.BINARY); + } + else { + statementWrite.setBytes(i, (byte[])field.val); + } + break; + case TIMESTAMP: + if (field.val == null) { + statementWrite.setNull(i, Types.TIMESTAMP); + } + else { + statementWrite.setTimestamp(i, (Timestamp)field.val); + } + break; + case DATE: + if (field.val == null) { + statementWrite.setNull(i, Types.DATE); + } + else { + statementWrite.setDate(i, (Date)field.val); + } + break; + default: + throw new IOException("The data tuple from JdbcResolver is corrupted"); --- End diff -- This message is correct. If a certain data type is not supported, the exception must be thrown [in `JdbcResolver`](https://github.com/arenadata/incubator-hawq/blob/pxf_jdbc_writeAndFix/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java#L223).
---