Repository: hawq Updated Branches: refs/heads/master a741655ba -> 472fa2b74
http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java deleted file mode 100644 index 1c61537..0000000 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.apache.hawq.pxf.plugins.jdbc; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hawq.pxf.api.*; -import org.apache.hawq.pxf.api.io.DataType; -import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; -import org.apache.hawq.pxf.api.utilities.InputData; -import org.apache.hawq.pxf.api.utilities.Plugin; - -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -/** - * Class JdbcReadResolver Read the Jdbc ResultSet, and generates the data type - List {@link OneField}. - */ -public class JdbcReadResolver extends Plugin implements ReadResolver { - private static final Log LOG = LogFactory.getLog(JdbcReadResolver.class); - //HAWQ Table column definitions - private ArrayList<ColumnDescriptor> columns = null; - - public JdbcReadResolver(InputData input) { - super(input); - columns = input.getTupleDescription(); - } - - @Override - public List<OneField> getFields(OneRow row) throws Exception { - ResultSet result = (ResultSet) row.getData(); - LinkedList<OneField> fields = new LinkedList<>(); - - for (int i = 0; i < columns.size(); i++) { - ColumnDescriptor column = columns.get(i); - String colName = column.columnName(); - Object value = null; - - OneField oneField = new OneField(); - oneField.type = column.columnTypeCode(); - - switch (DataType.get(oneField.type)) { - case INTEGER: - value = result.getInt(colName); - break; - case FLOAT8: - value = result.getDouble(colName); - break; - case REAL: - value = result.getFloat(colName); - break; - case BIGINT: - value = result.getLong(colName); - break; - case SMALLINT: - value = result.getShort(colName); - break; - case BOOLEAN: - value = result.getBoolean(colName); - break; - case BYTEA: - value = result.getBytes(colName); - break; - case VARCHAR: - case BPCHAR: - case TEXT: - case NUMERIC: - value = result.getString(colName); - break; - case TIMESTAMP: - case DATE: - value = result.getDate(colName); - break; - default: - throw new UnsupportedOperationException("Unknwon Field Type : " + DataType.get(oneField.type).toString() - + ", Column : " + column.toString()); - } - oneField.val = value; - fields.add(oneField); - } - return fields; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java new file mode 100644 index 0000000..ab88326 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcResolver.java @@ -0,0 +1,367 @@ +package org.apache.hawq.pxf.plugins.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.WriteResolver; + +import java.util.List; +import java.util.LinkedList; +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * JDBC tables resolver + */ +public class JdbcResolver extends JdbcPlugin implements ReadResolver, WriteResolver { + /** + * Class constructor + */ + public JdbcResolver(InputData input) throws UserDataException { + super(input); + } + + /** + * getFields() implementation + * + * @throws SQLException if the provided {@link OneRow} object is invalid + */ + @Override + public List<OneField> getFields(OneRow row) throws SQLException { + ResultSet result = (ResultSet) row.getData(); + LinkedList<OneField> fields = new LinkedList<>(); + + for (ColumnDescriptor column : columns) { + String colName = column.columnName(); + Object value = null; + + OneField oneField = new OneField(); + oneField.type = column.columnTypeCode(); + + switch (DataType.get(oneField.type)) { + case INTEGER: + value = result.getInt(colName); + break; + case FLOAT8: + value = result.getDouble(colName); + break; + case REAL: + value = result.getFloat(colName); + break; + case BIGINT: + value = result.getLong(colName); + break; + case SMALLINT: + value = result.getShort(colName); + break; + case BOOLEAN: + value = result.getBoolean(colName); + break; + case BYTEA: + value = result.getBytes(colName); + break; + case VARCHAR: + case BPCHAR: + case TEXT: + case NUMERIC: + value = result.getString(colName); + break; + case DATE: + value = result.getDate(colName); + break; + case TIMESTAMP: + value = result.getTimestamp(colName); + break; + default: + throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported"); + } + + oneField.val = value; + fields.add(oneField); + } + return fields; + } + + /** + * setFields() implementation + * + * @return OneRow with the data field containing a List<OneField> + * OneFields are not reordered before being passed to Accessor; at the + * moment, there is no way to correct the order of the fields if it is not. + * In practice, the 'record' provided is always ordered the right way. + * + * @throws UnsupportedOperationException if field of some type is not supported + */ + @Override + public OneRow setFields(List<OneField> record) throws UnsupportedOperationException, ParseException { + int column_index = 0; + for (OneField oneField : record) { + ColumnDescriptor column = columns.get(column_index); + if ( + LOG.isDebugEnabled() && + DataType.get(column.columnTypeCode()) != DataType.get(oneField.type) + ) { + LOG.warn("The provided tuple of data may be disordered. Datatype of column with descriptor '" + column.toString() + "' must be '" + DataType.get(column.columnTypeCode()).toString() + "', but actual is '" + DataType.get(oneField.type).toString() + "'"); + } + + // Check that data type is supported + switch (DataType.get(oneField.type)) { + case BOOLEAN: + case INTEGER: + case FLOAT8: + case REAL: + case BIGINT: + case SMALLINT: + case NUMERIC: + case VARCHAR: + case BPCHAR: + case TEXT: + case BYTEA: + case TIMESTAMP: + case DATE: + break; + default: + throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported"); + } + + if ( + LOG.isDebugEnabled() && + DataType.get(oneField.type) == DataType.BYTEA + ) { + LOG.debug("OneField content (conversion from BYTEA): '" + new String((byte[])oneField.val) + "'"); + } + + // Convert TEXT columns into native data types + if ((DataType.get(oneField.type) == DataType.TEXT) && (DataType.get(column.columnTypeCode()) != DataType.TEXT)) { + oneField.type = column.columnTypeCode(); + if (oneField.val != null) { + String rawVal = (String)oneField.val; + if (LOG.isDebugEnabled()) { + LOG.debug("OneField content (conversion from TEXT): '" + rawVal + "'"); + } + switch (DataType.get(column.columnTypeCode())) { + case VARCHAR: + case BPCHAR: + case TEXT: + case BYTEA: + break; + case BOOLEAN: + oneField.val = (Object)Boolean.parseBoolean(rawVal); + break; + case INTEGER: + oneField.val = (Object)Integer.parseInt(rawVal); + break; + case FLOAT8: + oneField.val = (Object)Double.parseDouble(rawVal); + break; + case REAL: + oneField.val = (Object)Float.parseFloat(rawVal); + break; + case BIGINT: + oneField.val = (Object)Long.parseLong(rawVal); + break; + case SMALLINT: + oneField.val = (Object)Short.parseShort(rawVal); + break; + case NUMERIC: + oneField.val = (Object)new BigDecimal(rawVal); + break; + case TIMESTAMP: + boolean isConversionSuccessful = false; + for (SimpleDateFormat sdf : timestampSDFs.get()) { + try { + java.util.Date parsedTimestamp = sdf.parse(rawVal); + oneField.val = (Object)new Timestamp(parsedTimestamp.getTime()); + isConversionSuccessful = true; + break; + } + catch (ParseException e) { + // pass + } + } + if (!isConversionSuccessful) { + throw new ParseException(rawVal, 0); + } + break; + case DATE: + oneField.val = (Object)new Date(dateSDF.get().parse(rawVal).getTime()); + break; + default: + throw new UnsupportedOperationException("Field type '" + DataType.get(oneField.type).toString() + "' (column '" + column.toString() + "') is not supported"); + } + } + } + + column_index += 1; + } + return new OneRow(new LinkedList<OneField>(record)); + } + + /** + * Decode OneRow object and pass all its contents to a PreparedStatement + * + * @throws IOException if data in a OneRow is corrupted + * @throws SQLException if the given statement is broken + */ + @SuppressWarnings("unchecked") + public static void decodeOneRowToPreparedStatement(OneRow row, PreparedStatement statement) throws IOException, SQLException { + // This is safe: OneRow comes from JdbcResolver + List<OneField> tuple = (List<OneField>)row.getData(); + for (int i = 1; i <= tuple.size(); i++) { + OneField field = tuple.get(i - 1); + switch (DataType.get(field.type)) { + case INTEGER: + if (field.val == null) { + statement.setNull(i, Types.INTEGER); + } + else { + statement.setInt(i, (int)field.val); + } + break; + case BIGINT: + if (field.val == null) { + statement.setNull(i, Types.INTEGER); + } + else { + statement.setLong(i, (long)field.val); + } + break; + case SMALLINT: + if (field.val == null) { + statement.setNull(i, Types.INTEGER); + } + else { + statement.setShort(i, (short)field.val); + } + break; + case REAL: + if (field.val == null) { + statement.setNull(i, Types.FLOAT); + } + else { + statement.setFloat(i, (float)field.val); + } + break; + case FLOAT8: + if (field.val == null) { + statement.setNull(i, Types.DOUBLE); + } + else { + statement.setDouble(i, (double)field.val); + } + break; + case BOOLEAN: + if (field.val == null) { + statement.setNull(i, Types.BOOLEAN); + } + else { + statement.setBoolean(i, (boolean)field.val); + } + break; + case NUMERIC: + if (field.val == null) { + statement.setNull(i, Types.NUMERIC); + } + else { + statement.setBigDecimal(i, (BigDecimal)field.val); + } + break; + case VARCHAR: + case BPCHAR: + case TEXT: + if (field.val == null) { + statement.setNull(i, Types.VARCHAR); + } + else { + statement.setString(i, (String)field.val); + } + break; + case BYTEA: + if (field.val == null) { + statement.setNull(i, Types.BINARY); + } + else { + statement.setBytes(i, (byte[])field.val); + } + break; + case TIMESTAMP: + if (field.val == null) { + statement.setNull(i, Types.TIMESTAMP); + } + else { + statement.setTimestamp(i, (Timestamp)field.val); + } + break; + case DATE: + if (field.val == null) { + statement.setNull(i, Types.DATE); + } + else { + statement.setDate(i, (Date)field.val); + } + break; + default: + throw new IOException("The data tuple from JdbcResolver is corrupted"); + } + } + } + + private static final Log LOG = LogFactory.getLog(JdbcResolver.class); + + // SimpleDateFormat to parse TEXT into DATE + private static ThreadLocal<SimpleDateFormat> dateSDF = new ThreadLocal<SimpleDateFormat>() { + @Override protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyy-MM-dd"); + } + }; + // SimpleDateFormat to parse TEXT into TIMESTAMP (with microseconds) + private static ThreadLocal<SimpleDateFormat[]> timestampSDFs = new ThreadLocal<SimpleDateFormat[]>() { + @Override protected SimpleDateFormat[] initialValue() { + SimpleDateFormat[] retRes = { + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSS"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"), + new SimpleDateFormat("yyyy-MM-dd") + }; + return retRes; + } + }; +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java index 541aa86..d6c8fba 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java @@ -18,9 +18,6 @@ package org.apache.hawq.pxf.plugins.jdbc; * under the License. */ -import java.util.ArrayList; -import java.util.List; - import org.apache.hawq.pxf.api.LogicalFilter; import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct; import org.apache.hawq.pxf.api.BasicFilter; @@ -29,81 +26,99 @@ import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.List; +import java.text.ParseException; + /** - * Parse filter object generated by parent class {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder}, - * and build WHERE statement. - * For Multiple filters , currently only support HDOP_AND . - * The unsupported Filter operation and LogicalOperation ,will return null statement. + * A WHERE queries builder * + * Parses filter objects generated by {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder} and builds WHERE statements + * Only HDOP_AND is supported for multiple filters */ public class WhereSQLBuilder extends JdbcFilterBuilder { - private InputData inputData; - public WhereSQLBuilder(InputData input) { inputData = input; } /** - * 1.check for LogicalOperator, Jdbc currently only support HDOP_AND. - * 2.and convert to BasicFilter List. + * Insert WHERE constraints into a given query + * Note that if filter is not supported, query is left unchanged + * + * @param dbName Database name (affects the behaviour for DATE constraints) + * @param query SQL query to insert constraints to. The query may may contain other WHERE statements + * + * @throws ParseException if an error happens when parsing the constraints (provided to class constructor) */ - private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException { - if (returnList == null) - returnList = new ArrayList<>(); - if (filter instanceof BasicFilter) { - returnList.add((BasicFilter) filter); - return returnList; + public void buildWhereSQL(String dbName, StringBuilder query) throws ParseException { + if (!inputData.hasFilter()) { + return; } - LogicalFilter lfilter = (LogicalFilter) filter; - if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND) - throw new UnsupportedFilterException("unsupported LogicalOperation : " + lfilter.getOperator()); - for (Object f : lfilter.getFilterList()) { - returnList = convertBasicFilterList(f, returnList); - } - return returnList; - } - public String buildWhereSQL(String db_product) throws Exception { - if (!inputData.hasFilter()) - return null; - List<BasicFilter> filters = null; try { + StringBuilder prepared = new StringBuilder(); + if (!query.toString().contains("WHERE")) { + prepared.append(" WHERE "); + } + else { + prepared.append(" AND "); + } + + // Get constraints and parse them String filterString = inputData.getFilterString(); Object filterObj = getFilterObject(filterString); - + List<BasicFilter> filters = null; filters = convertBasicFilterList(filterObj, filters); - StringBuffer sb = new StringBuffer("1=1"); + + String andDivisor = ""; for (Object obj : filters) { - BasicFilter filter = (BasicFilter) obj; - sb.append(" AND "); + prepared.append(andDivisor); + andDivisor = " AND "; + // Insert constraint column name + BasicFilter filter = (BasicFilter) obj; ColumnDescriptor column = inputData.getColumn(filter.getColumn().index()); - //the column name of filter - sb.append(column.columnName()); + prepared.append(column.columnName()); - //the operation of filter + // Insert constraint operator FilterParser.Operation op = filter.getOperation(); switch (op) { case HDOP_LT: - sb.append("<"); + prepared.append(" < "); break; case HDOP_GT: - sb.append(">"); + prepared.append(" > "); break; case HDOP_LE: - sb.append("<="); + prepared.append(" <= "); break; case HDOP_GE: - sb.append(">="); + prepared.append(" >= "); break; case HDOP_EQ: - sb.append("="); + prepared.append(" = "); + break; + case HDOP_LIKE: + prepared.append(" LIKE "); break; + case HDOP_NE: + prepared.append(" <> "); + break; + case HDOP_IS_NULL: + prepared.append(" IS NULL"); + continue; + case HDOP_IS_NOT_NULL: + prepared.append(" IS NOT NULL"); + continue; default: - throw new UnsupportedFilterException("unsupported Filter operation : " + op); + throw new UnsupportedFilterException("Unsupported Filter operation: " + op); } - DbProduct dbProduct = DbProduct.getDbProduct(db_product); + // Insert constraint constant + DbProduct dbProduct = DbProduct.getDbProduct(dbName); Object val = filter.getConstant().constant(); switch (DataType.get(column.columnTypeCode())) { case SMALLINT: @@ -112,29 +127,68 @@ public class WhereSQLBuilder extends JdbcFilterBuilder { case FLOAT8: case REAL: case BOOLEAN: - sb.append(val.toString()); + prepared.append(val.toString()); break; case TEXT: - sb.append("'").append(val.toString()).append("'"); + prepared.append("'").append(val.toString()).append("'"); break; case DATE: - //According to the database products, for the date field for special treatment. - sb.append(dbProduct.wrapDate(val)); + // Date field has different format in different databases + prepared.append(dbProduct.wrapDate(val)); + break; + case TIMESTAMP: + // Timestamp field has different format in different databases + prepared.append(dbProduct.wrapTimestamp(val)); break; default: - throw new UnsupportedFilterException("unsupported column type for filtering : " + column.columnTypeCode()); + throw new UnsupportedFilterException("Unsupported column type for filtering: " + column.columnTypeCode()); } - } - return sb.toString(); - } catch (UnsupportedFilterException ex) { - return null; + + // No exceptions were thrown, change the provided query + query.append(prepared); + } + catch (UnsupportedFilterException e) { + LOG.debug("WHERE clause is omitted: " + e.toString()); + // Silence the exception and do not insert constraints + } + } + + /** + * Convert filter object into a list of {@link BasicFilter} + * + * @param filter Filter object + * @param returnList A list of {@link BasicFilter} to append filters to. Must be null if the function is not called recursively + */ + private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException { + if (returnList == null) { + returnList = new ArrayList<>(); + } + + if (filter instanceof BasicFilter) { + returnList.add((BasicFilter) filter); + return returnList; + } + + LogicalFilter lfilter = (LogicalFilter) filter; + if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND) { + throw new UnsupportedFilterException("Logical operation '" + lfilter.getOperator() + "' is not supported"); + } + for (Object f : lfilter.getFilterList()) { + returnList = convertBasicFilterList(f, returnList); } + + return returnList; } - static class UnsupportedFilterException extends Exception { - UnsupportedFilterException(String message) { + private static class UnsupportedFilterException extends Exception { + UnsupportedFilterException(String message) { super(message); } } + + private static final Log LOG = LogFactory.getLog(WhereSQLBuilder.class); + + // {@link InputData} from PXF + private InputData inputData; } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java index cdca8a6..bb79c84 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java @@ -19,29 +19,34 @@ package org.apache.hawq.pxf.plugins.jdbc.utils; * under the License. */ - import org.apache.commons.lang.ArrayUtils; /** - * A tool class, used to deal with byte array merging, split and other methods. + * A tool class for byte array merging, splitting and conversion */ public class ByteUtil { - public static byte[] mergeBytes(byte[] b1, byte[] b2) { return ArrayUtils.addAll(b1,b2); } - public static byte[][] splitBytes(byte[] bytes, int n) { - int len = bytes.length / n; + /** + * Split a byte[] array into two arrays, each of which represents a value of type long + */ + public static byte[][] splitBytes(byte[] bytes) { + final int N = 8; + int len = bytes.length / N; byte[][] newBytes = new byte[len][]; int j = 0; for (int i = 0; i < len; i++) { - newBytes[i] = new byte[n]; - for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++]; + newBytes[i] = new byte[N]; + for (int k = 0; k < N; k++) newBytes[i][k] = bytes[j++]; } return newBytes; } + /** + * Convert a value of type long to a byte[] array + */ public static byte[] getBytes(long value) { byte[] b = new byte[8]; b[0] = (byte) ((value >> 56) & 0xFF); @@ -55,22 +60,9 @@ public class ByteUtil { return b; } - public static byte[] getBytes(int value) { - byte[] b = new byte[4]; - b[0] = (byte) ((value >> 24) & 0xFF); - b[1] = (byte) ((value >> 16) & 0xFF); - b[2] = (byte) ((value >> 8) & 0xFF); - b[3] = (byte) ((value >> 0) & 0xFF); - return b; - } - - public static int toInt(byte[] b) { - return (((((int) b[3]) & 0xFF) << 32) + - ((((int) b[2]) & 0xFF) << 40) + - ((((int) b[1]) & 0xFF) << 48) + - ((((int) b[0]) & 0xFF) << 56)); - } - + /** + * Convert a byte[] array to a value of type long + */ public static long toLong(byte[] b) { return ((((long) b[7]) & 0xFF) + ((((long) b[6]) & 0xFF) << 8) + http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java index 30ff1fe..c8b8cfb 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java @@ -20,14 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils; */ /** - * As the syntax of different database products are not the same, such as the date type field for processing, ORACLE use to_date () function, and mysql use Date () function. - So we create this class to abstract public methods, the specific database products can implementation of these methods. + * A tool class to process data types that must have different form in different databases. + * Such processing is required to create correct constraints (WHERE statements). */ public abstract class DbProduct { - //wrap date string - public abstract String wrapDate(Object date_val); - - + /** + * Get an instance of some class - the database product + * + * @param String dbName A full name of the database + * @return a DbProduct of the required class + */ public static DbProduct getDbProduct(String dbName) { if (dbName.toUpperCase().contains("MYSQL")) return new MysqlProduct(); @@ -35,15 +37,40 @@ public abstract class DbProduct { return new OracleProduct(); else if (dbName.toUpperCase().contains("POSTGRES")) return new PostgresProduct(); + else if (dbName.toUpperCase().contains("MICROSOFT")) + return new MicrosoftProduct(); else - //Unsupported databases may execute errors return new CommonProduct(); } + + /** + * Wraps a given date value the way required by a target database + * + * @param val {@link java.sql.Date} object to wrap + * @return a string with a properly wrapped date object + */ + public abstract String wrapDate(Object val); + + /** + * Wraps a given timestamp value the way required by a target database + * + * @param val {@link java.sql.Timestamp} object to wrap + * @return a string with a properly wrapped timestamp object + */ + public abstract String wrapTimestamp(Object val); } +/** + * Common product. Used when no other products are avalibale + */ class CommonProduct extends DbProduct { @Override - public String wrapDate(Object dateVal) { - return "date'" + dateVal + "'"; + public String wrapDate(Object val) { + return "date'" + val + "'"; + } + + @Override + public String wrapTimestamp(Object val) { + return "'" + val + "'"; } } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java new file mode 100644 index 0000000..5cec52d --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MicrosoftProduct.java @@ -0,0 +1,35 @@ +package org.apache.hawq.pxf.plugins.jdbc.utils; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Implements methods for the Microsoft SQL server database + */ +public class MicrosoftProduct extends DbProduct { + @Override + public String wrapDate(Object val){ + return "'" + val + "'"; + } + + @Override + public String wrapTimestamp(Object val) { + return "'" + val + "'"; + } +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java index 2e60ada..27f7605 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java @@ -20,12 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils; */ /** - * Implements methods for MySQL Database. + * Implements methods for the MySQL Database. */ public class MysqlProduct extends DbProduct { + @Override + public String wrapDate(Object val){ + return "DATE('" + val + "')"; + } @Override - public String wrapDate(Object dateVal){ - return "DATE('" + dateVal + "')"; + public String wrapTimestamp(Object val) { + return "'" + val + "'"; } } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java index b46c5f3..c2c656b 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java @@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils; */ /** - * Implements methods for Oracle Database. + * Implements methods for the Oracle Database. */ public class OracleProduct extends DbProduct { @Override - public String wrapDate(Object dateVal) { - return "to_date('" + dateVal + "','yyyy-mm-dd')"; + public String wrapDate(Object val) { + return "to_date('" + val + "', 'YYYY-MM-DD')"; + } + + @Override + public String wrapTimestamp(Object val) { + return "to_timestamp('" + val + "', 'YYYY-MM-DD HH:MI:SS.FF')"; } } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java index 901cf2e..c25ec96 100644 --- a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java @@ -20,11 +20,16 @@ package org.apache.hawq.pxf.plugins.jdbc.utils; */ /** - * Implements methods for Postgres Database. + * Implements methods for the PostgreSQL. */ public class PostgresProduct extends DbProduct { @Override - public String wrapDate(Object dateVal) { - return "date'" + dateVal + "'"; + public String wrapDate(Object val) { + return "date'" + val + "'"; + } + + @Override + public String wrapTimestamp(Object val) { + return "'" + val + "'"; } } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java new file mode 100644 index 0000000..3e1404c --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java @@ -0,0 +1,109 @@ +package org.apache.hawq.pxf.plugins.jdbc.writercallable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver; +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin; + +import java.util.LinkedList; +import java.util.List; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * This writer makes batch INSERTs. + * + * A call() is required after a certain number of supply() calls + */ +class BatchWriterCallable implements WriterCallable { + @Override + public void supply(OneRow row) throws IllegalStateException { + if ((batchSize > 0) && (rows.size() >= batchSize)) { + throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable"); + } + if (row == null) { + throw new IllegalArgumentException("Trying to supply() a null OneRow object"); + } + rows.add(row); + } + + @Override + public boolean isCallRequired() { + return (batchSize > 0) && (rows.size() >= batchSize); + } + + @Override + public SQLException call() throws IOException, SQLException, ClassNotFoundException { + if (rows.isEmpty()) { + return null; + } + + boolean statementMustBeDeleted = false; + if (statement == null) { + statement = plugin.getPreparedStatement(plugin.getConnection(), query); + statementMustBeDeleted = true; + } + + for (OneRow row : rows) { + JdbcResolver.decodeOneRowToPreparedStatement(row, statement); + statement.addBatch(); + } + + try { + statement.executeBatch(); + } + catch (SQLException e) { + return e; + } + finally { + rows.clear(); + if (statementMustBeDeleted) { + JdbcPlugin.closeStatement(statement); + statement = null; + } + } + + return null; + } + + /** + * Construct a new batch writer + */ + BatchWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement, int batchSize) { + if (plugin == null || query == null) { + throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null"); + } + + this.plugin = plugin; + this.query = query; + this.statement = statement; + this.batchSize = batchSize; + + rows = new LinkedList<>(); + } + + private final JdbcPlugin plugin; + private final String query; + private PreparedStatement statement; + private List<OneRow> rows; + private final int batchSize; +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java new file mode 100644 index 0000000..63dbb29 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java @@ -0,0 +1,102 @@ +package org.apache.hawq.pxf.plugins.jdbc.writercallable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.plugins.jdbc.JdbcResolver; +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * This writer makes simple, one-by-one INSERTs. + * + * A call() is required after every supply() + */ +class SimpleWriterCallable implements WriterCallable { + @Override + public void supply(OneRow row) throws IllegalStateException { + if (this.row != null) { + throw new IllegalStateException("Trying to supply() a OneRow object to a full WriterCallable"); + } + if (row == null) { + throw new IllegalArgumentException("Trying to supply() a null OneRow object"); + } + this.row = row; + } + + @Override + public boolean isCallRequired() { + return this.row != null; + } + + @Override + public SQLException call() throws IOException, SQLException, ClassNotFoundException { + if (row == null) { + return null; + } + + boolean statementMustBeDeleted = false; + if (statement == null) { + statement = plugin.getPreparedStatement(plugin.getConnection(), query); + statementMustBeDeleted = true; + } + + JdbcResolver.decodeOneRowToPreparedStatement(row, statement); + + try { + if (statement.executeUpdate() != 1) { + throw new SQLException("The number of rows affected by INSERT query is not equal to the number of rows provided"); + } + } + catch (SQLException e) { + return e; + } + finally { + row = null; + if (statementMustBeDeleted) { + JdbcPlugin.closeStatement(statement); + statement = null; + } + } + + return null; + } + + /** + * Construct a new simple writer + */ + SimpleWriterCallable(JdbcPlugin plugin, String query, PreparedStatement statement) { + if ((plugin == null) || (query == null)) { + throw new IllegalArgumentException("The provided JdbcPlugin or SQL query is null"); + } + this.plugin = plugin; + this.query = query; + this.statement = statement; + row = null; + } + + private final JdbcPlugin plugin; + private final String query; + private PreparedStatement statement; + private OneRow row; +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java new file mode 100644 index 0000000..e2a6916 --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallable.java @@ -0,0 +1,56 @@ +package org.apache.hawq.pxf.plugins.jdbc.writercallable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; + +import java.util.concurrent.Callable; +import java.sql.SQLException; + +/** + * An object that processes INSERT operation on {@link OneRow} objects + */ +public interface WriterCallable extends Callable<SQLException> { + /** + * Pass the next OneRow to this WriterCallable. + * + * @throws IllegalStateException if this WriterCallable must be call()ed before the next call to supply() + */ + void supply(OneRow row) throws IllegalStateException; + + /** + * Check whether this WriterCallable must be called + * + * @return true if this WriterCallable must be call()ed before the next call to supply() + * @return false otherwise + */ + boolean isCallRequired(); + + /** + * Execute an INSERT query. + * + * @return null or a SQLException that happened when executing the query + * @return null if the query was empty (nothing was there to execute) + * + * @throws Exception an exception that happened during execution, but that is not related to the execution of the query itself (for instance, it may originate from {@link java.sql.PreparedStatement} close() method) + */ + @Override + SQLException call() throws Exception; +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java new file mode 100644 index 0000000..aaf13bd --- /dev/null +++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/writercallable/WriterCallableFactory.java @@ -0,0 +1,97 @@ +package org.apache.hawq.pxf.plugins.jdbc.writercallable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.plugins.jdbc.JdbcPlugin; + +import java.sql.PreparedStatement; + +/** + * An object that processes INSERT operation on {@link OneRow} objects + */ +public class WriterCallableFactory { + /** + * Create a new {@link WriterCallable} factory. + * + * Note that 'setPlugin' and 'setQuery' must be called before construction of a {@link WriterCallable}. + * + * By default, 'statement' is null + */ + public WriterCallableFactory() { + batchSize = JdbcPlugin.DEFAULT_BATCH_SIZE; + plugin = null; + query = null; + statement = null; + } + + /** + * Get an instance of WriterCallable + * + * @return an implementation of WriterCallable, chosen based on parameters that were set for this factory + */ + public WriterCallable get() { + if (batchSize > 1) { + return new BatchWriterCallable(plugin, query, statement, batchSize); + } + return new SimpleWriterCallable(plugin, query, statement); + } + + /** + * Set {@link JdbcPlugin} to use. + * REQUIRED + */ + public void setPlugin(JdbcPlugin plugin) { + this.plugin = plugin; + } + + /** + * Set SQL query to use. + * REQUIRED + */ + public void setQuery(String query) { + this.query = query; + } + + /** + * Set batch size to use + * + * @param batchSize > 1: Use batches of specified size + * @param batchSize < 1: Do not use batches + */ + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Set statement to use. + * + * @param statement = null: Create a new connection & a new statement each time {@link WriterCallable} is called + * @param statement not null: Use the given statement and do not close or reopen it + */ + public void setStatement(PreparedStatement statement) { + this.statement = statement; + } + + private int batchSize; + private JdbcPlugin plugin; + private String query; + private PreparedStatement statement; +} http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java index 6785af6..33ae585 100644 --- a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java +++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java @@ -56,7 +56,7 @@ public class JdbcPartitionFragmenterTest { //fragment - 1 byte[] fragMeta = fragments.get(0).getMetadata(); - byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 8); + byte[][] newBytes = ByteUtil.splitBytes(fragMeta); long fragStart = ByteUtil.toLong(newBytes[0]); long fragEnd = ByteUtil.toLong(newBytes[1]); assertDateEquals(fragStart, 2008, 1, 1); @@ -64,7 +64,7 @@ public class JdbcPartitionFragmenterTest { //fragment - 12 fragMeta = fragments.get(11).getMetadata(); - newBytes = ByteUtil.splitBytes(fragMeta, 8); + newBytes = ByteUtil.splitBytes(fragMeta); fragStart = ByteUtil.toLong(newBytes[0]); fragEnd = ByteUtil.toLong(newBytes[1]); assertDateEquals(fragStart, 2008, 12, 1); @@ -102,17 +102,17 @@ public class JdbcPartitionFragmenterTest { //fragment - 1 byte[] fragMeta = fragments.get(0).getMetadata(); - byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 4); - int fragStart = ByteUtil.toInt(newBytes[0]); - int fragEnd = ByteUtil.toInt(newBytes[1]); + byte[][] newBytes = ByteUtil.splitBytes(fragMeta); + long fragStart = ByteUtil.toLong(newBytes[0]); + long fragEnd = ByteUtil.toLong(newBytes[1]); assertEquals(2001, fragStart); assertEquals(2003, fragEnd); //fragment - 6 fragMeta = fragments.get(5).getMetadata(); - newBytes = ByteUtil.splitBytes(fragMeta, 4); - fragStart = ByteUtil.toInt(newBytes[0]); - fragEnd = ByteUtil.toInt(newBytes[1]); + newBytes = ByteUtil.splitBytes(fragMeta); + fragStart = ByteUtil.toLong(newBytes[0]); + fragEnd = ByteUtil.toLong(newBytes[1]); assertEquals(2011, fragStart); assertEquals(2012, fragEnd); http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java index ebe367d..de173b8 100644 --- a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java +++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java @@ -59,44 +59,53 @@ public class SqlBuilderTest { public void testIdFilter() throws Exception { prepareConstruction(); when(inputData.hasFilter()).thenReturn(true); - when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");//id=1 + // id = 1 + when(inputData.getFilterString()).thenReturn("a0c20s1d1o5"); WhereSQLBuilder builder = new WhereSQLBuilder(inputData); - assertEquals("1=1 AND id=1", builder.buildWhereSQL(DB_PRODUCT)); + StringBuilder sb = new StringBuilder(); + builder.buildWhereSQL(DB_PRODUCT, sb); + assertEquals(" WHERE id = 1", sb.toString()); } @Test public void testDateAndAmtFilter() throws Exception { prepareConstruction(); when(inputData.hasFilter()).thenReturn(true); - // cdate>'2008-02-01' and cdate<'2008-12-01' and amt > 1200 + // cdate > '2008-02-01' and cdate < '2008-12-01' and amt > 1200 when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l0"); WhereSQLBuilder builder = new WhereSQLBuilder(inputData); - assertEquals("1=1 AND cdate>DATE('2008-02-01') AND cdate<DATE('2008-12-01') AND amt>1200" - , builder.buildWhereSQL(DB_PRODUCT)); + StringBuilder sb = new StringBuilder(); + builder.buildWhereSQL(DB_PRODUCT, sb); + assertEquals(" WHERE cdate > DATE('2008-02-01') AND cdate < DATE('2008-12-01') AND amt > 1200" + , sb.toString()); } @Test public void testUnsupportedOperationFilter() throws Exception { prepareConstruction(); when(inputData.hasFilter()).thenReturn(true); - // grade like 'bad' - when(inputData.getFilterString()).thenReturn("a3c25s3dbado7"); + // IN 'bad' + when(inputData.getFilterString()).thenReturn("a3c25s3dbado10"); WhereSQLBuilder builder = new WhereSQLBuilder(inputData); - assertEquals(null, builder.buildWhereSQL(DB_PRODUCT)); + StringBuilder sb = new StringBuilder(); + builder.buildWhereSQL(DB_PRODUCT, sb); + assertEquals("", sb.toString()); } @Test public void testUnsupportedLogicalFilter() throws Exception { prepareConstruction(); when(inputData.hasFilter()).thenReturn(true); - // cdate>'2008-02-01' or amt < 1200 + // cdate > '2008-02-01' or amt < 1200 when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a2c20s4d1200o2l1"); WhereSQLBuilder builder = new WhereSQLBuilder(inputData); - assertEquals(null, builder.buildWhereSQL(DB_PRODUCT)); + StringBuilder sb = new StringBuilder(); + builder.buildWhereSQL(DB_PRODUCT, sb); + assertEquals("", sb.toString()); } @Test @@ -110,11 +119,11 @@ public class SqlBuilderTest { List<Fragment> fragments = fragment.getFragments(); assertEquals(6, fragments.size()); - //partition-1 : cdate>=2008-01-01 and cdate<2008-03-01 + // Partition: cdate >= 2008-01-01 and cdate < 2008-03-01 when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); - String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL); - assertEquals(ORIGINAL_SQL + " WHERE 1=1 AND " + - "cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", fragmentSql); + StringBuilder sb = new StringBuilder(ORIGINAL_SQL); + JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb); + assertEquals(ORIGINAL_SQL + " WHERE cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", sb.toString()); } @Test @@ -125,19 +134,19 @@ public class SqlBuilderTest { when(inputData.getUserProperty("PARTITION_BY")).thenReturn("grade:enum"); when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad"); + StringBuilder sb = new StringBuilder(ORIGINAL_SQL); WhereSQLBuilder builder = new WhereSQLBuilder(inputData); - String whereSql = builder.buildWhereSQL(DB_PRODUCT); - assertEquals("1=1 AND id>5", whereSql); + builder.buildWhereSQL(DB_PRODUCT, sb); + assertEquals(ORIGINAL_SQL + " WHERE id > 5", sb.toString()); JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData); List<Fragment> fragments = fragment.getFragments(); - //partition-1 : id>5 and grade='excellent' + // Partition: id > 5 and grade = 'excellent' when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); - String filterSql = ORIGINAL_SQL + " WHERE " + whereSql; - String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, filterSql); - assertEquals(filterSql + " AND grade='excellent'", fragmentSql); + JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb); + assertEquals(ORIGINAL_SQL + " WHERE id > 5 AND grade = 'excellent'", sb.toString()); } @Test @@ -150,8 +159,9 @@ public class SqlBuilderTest { when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata()); - String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL); - assertEquals(ORIGINAL_SQL, fragmentSql); + StringBuilder sb = new StringBuilder(ORIGINAL_SQL); + JdbcPartitionFragmenter.buildFragmenterSql(inputData, DB_PRODUCT, sb); + assertEquals(ORIGINAL_SQL, sb.toString()); } http://git-wip-us.apache.org/repos/asf/hawq/blob/472fa2b7/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml index 252791e..0cddad1 100644 --- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -177,11 +177,11 @@ under the License. </profile> <profile> <name>Jdbc</name> - <description>A profile for reading data into HAWQ via JDBC</description> + <description>A profile for reading and writing data via JDBC</description> <plugins> <fragmenter>org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter</fragmenter> - <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor</accessor> - <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver> + <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcAccessor</accessor> + <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcResolver</resolver> </plugins> </profile> <profile>
