Repository: phoenix Updated Branches: refs/heads/master 7332fc30c -> a67521190
PHOENIX-3134 varbinary fields bulk load difference between MR/psql and upserts Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a6752119 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a6752119 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a6752119 Branch: refs/heads/master Commit: a675211909415ca376e432d25f8a8822aadf5712 Parents: 7332fc3 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Wed Jan 18 13:27:17 2017 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Wed Jan 18 13:27:17 2017 +0530 ---------------------------------------------------------------------- .../expression/function/EncodeFormat.java | 4 +- .../phoenix/mapreduce/CsvBulkImportUtil.java | 7 +- .../phoenix/mapreduce/CsvBulkLoadTool.java | 12 ++- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 9 ++- .../apache/phoenix/schema/types/PBinary.java | 6 +- .../apache/phoenix/schema/types/PVarbinary.java | 5 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 11 +++ .../phoenix/util/csv/CsvUpsertExecutor.java | 25 ++++++ .../phoenix/util/json/JsonUpsertExecutor.java | 44 +++++++++++ .../mapreduce/CsvBulkImportUtilTest.java | 8 +- .../util/AbstractUpsertExecutorTest.java | 82 +++++++++++++++++--- .../phoenix/util/csv/CsvUpsertExecutorTest.java | 26 +++---- .../util/json/JsonUpsertExecutorTest.java | 6 ++ 14 files changed, 207 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java index ca6cb66..8130228 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java @@ -20,5 +20,7 @@ package org.apache.phoenix.expression.function; public enum EncodeFormat { HEX, //format for encoding HEX value to bytes - BASE62 //format for encoding a base 10 long value to base 62 string + BASE62, //format for encoding a base 10 long value to base 62 string + BASE64, //format for encoding a base 10 long value to base 64 string + ASCII // Plain Text }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java index 9289dbf..ff9ff72 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Base64; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import com.google.common.annotations.VisibleForTesting; @@ -38,15 +39,19 @@ public class CsvBulkImportUtil { * @param quoteChar quote character for the CSV input * @param escapeChar escape character for the CSV input * @param arrayDelimiter array delimiter character, can be null + * @param binaryEncoding */ public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar, - char escapeChar, String arrayDelimiter) { + char escapeChar, String arrayDelimiter, String binaryEncoding) { setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter); setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar); setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar); if (arrayDelimiter != null) { conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter); } + if(binaryEncoding!=null){ + conf.set(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, binaryEncoding); + } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index 8ed66b8..14b8c34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -35,6 +35,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool { static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character"); static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash"); static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)"); + static final Option binaryEncodingOption = new Option("b", "binaryEncoding", true, "Specifies binary encoding"); @Override protected Options getOptions() { @@ -43,6 +44,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool { options.addOption(QUOTE_OPT); options.addOption(ESCAPE_OPT); options.addOption(ARRAY_DELIMITER_OPT); + options.addOption(binaryEncodingOption); return options; } @@ -79,13 +81,19 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool { } escapeChar = escapeString.charAt(0); } - + + String binaryEncoding = null; + if (cmdLine.hasOption(binaryEncodingOption.getOpt())) { + binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt()); + } + CsvBulkImportUtil.initCsvImportJob( conf, delimiterChar, quoteChar, escapeChar, - cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt())); + cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()), + binaryEncoding); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1e002d2..14d9887 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -225,6 +225,9 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time"; public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade"; public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold"; + + //currently BASE64 and ASCII is supported + public static final String UPLOAD_BINARY_DATA_TYPE_ENCODING = "phoenix.upload.binaryDataType.encoding"; public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index ef3ac39..0e8b9d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -79,6 +79,7 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.TRANSACTIONS_ENABLED; +import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING; import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; @@ -98,6 +99,7 @@ import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.ReadOnlyProps; + /** * Options for {@link QueryServices}. * @@ -271,6 +273,10 @@ public class QueryServicesOptions { } }; public static final String DEFAULT_SCHEMA = null; + public static final String DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING = "BASE64"; // for backward compatibility, till + // 4.10, psql and CSVBulkLoad + // expects binary data to be base 64 + // encoded private final Configuration config; @@ -338,7 +344,8 @@ public class QueryServicesOptions { .setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED) .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE) .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE) - .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED); + .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED) + .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java index 43906f0..3a9dcc7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java @@ -22,7 +22,6 @@ import java.text.Format; import java.util.Arrays; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.query.QueryConstants; @@ -182,10 +181,7 @@ public class PBinary extends PBinaryBase { @Override public Object toObject(String value) { - if (value == null || value.length() == 0) { - return null; - } - return Base64.decode(value); + return PVarbinary.INSTANCE.toObject(value); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java index d96650d..b3ce57a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java @@ -131,7 +131,10 @@ public class PVarbinary extends PBinaryBase { if (value == null || value.length() == 0) { return null; } - return Base64.decode(value); + Object object = Base64.decode(value); + if (object == null) { throw newIllegalDataException( + "Input: [" + value + "] is not base64 encoded"); } + return object; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index dbac76f..3c16d00 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -213,6 +213,9 @@ public class PhoenixRuntime { if (execCmd.isLocalIndexUpgrade()) { props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false"); } + if (execCmd.binaryEncoding != null) { + props.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, execCmd.binaryEncoding); + } conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class); conn.setRunningUpgrade(true); if (execCmd.isMapNamespace()) { @@ -532,6 +535,7 @@ public class PhoenixRuntime { private boolean mapNamespace; private String srcTable; private boolean localIndexUpgrade; + private String binaryEncoding; /** * Factory method to build up an {@code ExecutionCommand} based on supplied parameters. @@ -539,6 +543,8 @@ public class PhoenixRuntime { public static ExecutionCommand parseArgs(String[] args) { Option tableOption = new Option("t", "table", true, "Overrides the table into which the CSV data is loaded and is case sensitive"); + Option binaryEncodingOption = new Option("b", "binaryEncoding", true, + "Specifies binary encoding"); Option headerOption = new Option("h", "header", true, "Overrides the column names to" + " which the CSV data maps and is case sensitive. A special value of " + "in-line indicating that the first line of the CSV file determines the " + @@ -588,6 +594,7 @@ public class PhoenixRuntime { options.addOption(bypassUpgradeOption); options.addOption(mapNamespaceOption); options.addOption(localIndexUpgradeOption); + options.addOption(binaryEncodingOption); CommandLineParser parser = new PosixParser(); CommandLine cmdLine = null; @@ -606,6 +613,10 @@ public class PhoenixRuntime { if (cmdLine.hasOption(tableOption.getOpt())) { execCmd.tableName = cmdLine.getOptionValue(tableOption.getOpt()); } + + if (cmdLine.hasOption(binaryEncodingOption.getOpt())) { + execCmd.binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt()); + } if (cmdLine.hasOption(headerOption.getOpt())) { String columnString = cmdLine.getOptionValue(headerOption.getOpt()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java index 0d3e17d..cd40b44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java @@ -27,12 +27,18 @@ import java.util.Properties; import javax.annotation.Nullable; import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.function.EncodeFormat; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDataType.PDataCodec; import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.UpsertExecutor; @@ -116,6 +122,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { private final PDataType dataType; private final PDataCodec codec; private final DateUtil.DateTimeParser dateTimeParser; + private final String binaryEncoding; SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) { Properties props; @@ -148,6 +155,8 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { this.dateTimeParser = null; } this.codec = codec; + this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, + QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING); } @Nullable @@ -175,6 +184,22 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> { throw new RuntimeException("Invalid boolean value: '" + input + "', must be one of ['true','t','1','false','f','0']"); } + }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){ + EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase()); + Object object = null; + switch (format) { + case BASE64: + object = Base64.decode(input); + if (object == null) { throw new IllegalDataException( + "Input: [" + input + "] is not base64 encoded"); } + break; + case ASCII: + object = Bytes.toBytes(input); + break; + default: + throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding + "\""); + } + return object; } return dataType.toObject(input); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java index bbe0e30..ffa797d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java @@ -25,12 +25,20 @@ import java.sql.Types; import java.util.List; import java.util.Map; import java.util.Properties; + import javax.annotation.Nullable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.function.EncodeFormat; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.types.PBinary; +import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.UpsertExecutor; @@ -137,6 +145,7 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> { private final PDataType dataType; private final DateUtil.DateTimeParser dateTimeParser; + private final String binaryEncoding; SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) { Properties props; @@ -166,6 +175,8 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> { } else { this.dateTimeParser = null; } + this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, + QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING); } @Nullable @@ -180,7 +191,40 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> { byte[] byteValue = new byte[dataType.getByteSize()]; dataType.getCodec().encodeLong(epochTime, byteValue, 0); return dataType.toObject(byteValue); + }else if (dataType == PBoolean.INSTANCE) { + switch (input.toString()) { + case "true": + case "t": + case "T": + case "1": + return Boolean.TRUE; + case "false": + case "f": + case "F": + case "0": + return Boolean.FALSE; + default: + throw new RuntimeException("Invalid boolean value: '" + input + + "', must be one of ['true','t','1','false','f','0']"); + } + }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){ + EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase()); + Object object = null; + switch (format) { + case BASE64: + object = Base64.decode(input.toString()); + if (object == null) { throw new IllegalDataException( + "Input: [" + input + "] is not base64 encoded"); } + break; + case ASCII: + object = Bytes.toBytes(input.toString()); + break; + default: + throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding + "\""); } + return object; + } + return dataType.toObject(input, dataType); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java index 3c6271a..33c72a8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.mapreduce; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -28,9 +31,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - public class CsvBulkImportUtilTest { @Test @@ -41,7 +41,7 @@ public class CsvBulkImportUtilTest { char quote = '\002'; char escape = '!'; - CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null); + CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null, null); // Serialize and deserialize the config to ensure that there aren't any issues // with non-printable characters as delimiters http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java index 61b03fb..2b2544d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java @@ -17,6 +17,12 @@ */ package org.apache.phoenix.util; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; @@ -25,9 +31,14 @@ import java.sql.SQLException; import java.sql.Types; import java.util.Arrays; import java.util.List; +import java.util.Properties; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PIntegerArray; import org.junit.After; @@ -36,12 +47,6 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionlessQueryTest { protected Connection conn; @@ -51,6 +56,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles protected abstract UpsertExecutor<R, F> getUpsertExecutor(); protected abstract R createRecord(Object... columnValues) throws IOException; + protected abstract UpsertExecutor<R, F> getUpsertExecutor(Connection conn); @Before public void setUp() throws SQLException { @@ -59,7 +65,8 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles new ColumnInfo("NAME", Types.VARCHAR), new ColumnInfo("AGE", Types.INTEGER), new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()), - new ColumnInfo("BEARD", Types.BOOLEAN)); + new ColumnInfo("BEARD", Types.BOOLEAN), + new ColumnInfo("PIC", Types.BINARY)); preparedStatement = mock(PreparedStatement.class); upsertListener = mock(UpsertExecutor.UpsertListener.class); @@ -73,8 +80,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles @Test public void testExecute() throws Exception { + byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); + String encodedBinaryData = Base64.encodeBytes(binaryData); getUpsertExecutor().execute(createRecord(123L, "NameValue", 42, - Arrays.asList(1, 2, 3), true)); + Arrays.asList(1, 2, 3), true, encodedBinaryData)); verify(upsertListener).upsertDone(1L); verifyNoMoreInteractions(upsertListener); @@ -84,6 +93,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(3, Integer.valueOf(42)); verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.TRUE); + verify(preparedStatement).setObject(6, binaryData); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @@ -99,8 +109,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles @Test public void testExecute_TooManyFields() throws Exception { + byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); + String encodedBinaryData = Base64.encodeBytes(binaryData); R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3), - true, "Garbage"); + true, encodedBinaryData, "garbage"); getUpsertExecutor().execute(recordWithTooManyFields); verify(upsertListener).upsertDone(1L); @@ -111,14 +123,17 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setObject(3, Integer.valueOf(42)); verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.TRUE); + verify(preparedStatement).setObject(6, binaryData); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @Test public void testExecute_NullField() throws Exception { + byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); + String encodedBinaryData = Base64.encodeBytes(binaryData); getUpsertExecutor().execute(createRecord(123L, "NameValue", null, - Arrays.asList(1, 2, 3), false)); + Arrays.asList(1, 2, 3), false, encodedBinaryData)); verify(upsertListener).upsertDone(1L); verifyNoMoreInteractions(upsertListener); @@ -128,17 +143,62 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType()); verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); verify(preparedStatement).setObject(5, Boolean.FALSE); + verify(preparedStatement).setObject(6, binaryData); verify(preparedStatement).execute(); verifyNoMoreInteractions(preparedStatement); } @Test public void testExecute_InvalidType() throws Exception { + byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); + String encodedBinaryData = Base64.encodeBytes(binaryData); R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber", - Arrays.asList(1, 2, 3), true); + Arrays.asList(1, 2, 3), true, encodedBinaryData); getUpsertExecutor().execute(recordWithInvalidType); verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class)); verifyNoMoreInteractions(upsertListener); } + + @Test + public void testExecute_InvalidBoolean() throws Exception { + byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue(); + String encodedBinaryData = Base64.encodeBytes(binaryData); + R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean,"+encodedBinaryData); + getUpsertExecutor().execute(csvRecordWithInvalidType); + + verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); + } + + @Test + public void testExecute_InvalidBinary() throws Exception { + String notBase64Encoded="#@$df"; + R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,true,"+notBase64Encoded); + getUpsertExecutor().execute(csvRecordWithInvalidType); + + verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); + } + + @Test + public void testExecute_AsciiEncoded() throws Exception { + String asciiValue="#@$df"; + Properties info=new Properties(); + info.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,"ASCII"); + getUpsertExecutor(DriverManager.getConnection(getUrl(),info)).execute(createRecord(123L, "NameValue", 42, + Arrays.asList(1, 2, 3), true, asciiValue)); + + verify(upsertListener).upsertDone(1L); + verifyNoMoreInteractions(upsertListener); + + verify(preparedStatement).setObject(1, Long.valueOf(123L)); + verify(preparedStatement).setObject(2, "NameValue"); + verify(preparedStatement).setObject(3, Integer.valueOf(42)); + verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3})); + verify(preparedStatement).setObject(5, Boolean.TRUE); + verify(preparedStatement).setObject(6, Bytes.toBytes(asciiValue)); + verify(preparedStatement).execute(); + verifyNoMoreInteractions(preparedStatement); + } + + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java index c887ff7..a5ec4fa 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java @@ -18,23 +18,19 @@ package org.apache.phoenix.util.csv; import java.io.IOException; +import java.sql.Connection; import java.sql.SQLException; import java.util.List; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.phoenix.util.AbstractUpsertExecutorTest; import org.apache.phoenix.util.UpsertExecutor; import org.junit.Before; -import org.junit.Test; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.verify; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, String> { @@ -46,7 +42,13 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, public UpsertExecutor<CSVRecord, String> getUpsertExecutor() { return upsertExecutor; } - + + @Override + public UpsertExecutor<CSVRecord, String> getUpsertExecutor(Connection conn) { + return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, + upsertListener, ARRAY_SEP); + } + @Override public CSVRecord createRecord(Object... columnValues) throws IOException { for (int i = 0; i < columnValues.length; i++) { @@ -69,11 +71,5 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, upsertListener, ARRAY_SEP); } - @Test - public void testExecute_InvalidBoolean() throws Exception { - CSVRecord csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean"); - upsertExecutor.execute(ImmutableList.of(csvRecordWithInvalidType)); - - verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); - } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a6752119/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java index c042dd4..6ac9cf9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.phoenix.util.json; import java.io.IOException; +import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; @@ -50,4 +51,9 @@ public class JsonUpsertExecutorTest extends AbstractUpsertExecutorTest<Map<?, ?> super.setUp(); upsertExecutor = new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener); } + + @Override + protected UpsertExecutor<Map<?, ?>, Object> getUpsertExecutor(Connection conn) { + return new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener); + } }