PHOENIX-2733 Minor cleanup for improvements to CSV Bulk Loader performance (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/952a01cc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/952a01cc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/952a01cc Branch: refs/heads/calcite Commit: 952a01cce6d7d314ca26e89a398e75f38690cdad Parents: 1d03e02 Author: James Taylor <jtay...@salesforce.com> Authored: Wed Mar 2 13:54:05 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Mar 2 13:54:24 2016 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 12 +- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 2 +- .../mapreduce/FormatToBytesWritableMapper.java | 385 +++++++++++++++++++ .../mapreduce/FormatToKeyValueMapper.java | 378 ------------------ .../mapreduce/FormatToKeyValueReducer.java | 28 +- .../phoenix/mapreduce/JsonToKeyValueMapper.java | 2 +- .../bulkload/TargetTableRefFunctions.java | 2 +- .../util/PhoenixConfigurationUtil.java | 4 +- .../FormatToBytesWritableMapperTest.java | 102 +++++ .../mapreduce/FormatToKeyValueMapperTest.java | 102 ----- 10 files changed, 516 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index ab2848f..ff73530 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -209,10 +209,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName); Preconditions.checkNotNull(importColumns); Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty"); - FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns); + FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns); boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()); - conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows); - conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName); + conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows); + conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY, qualifiedTableName); // give subclasses their hook configureOptions(cmdLine, importColumns, conf); @@ -277,10 +277,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); - final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded); + final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded); - job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); - job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson); + job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); + job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson); // give subclasses their hook setupJob(job); http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index 2cb1ac7..e5a6e5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -41,7 +41,7 @@ import com.google.common.collect.Iterables; * extracting the created KeyValues and rolling back the statement execution before it is * committed to HBase. */ -public class CsvToKeyValueMapper extends FormatToKeyValueMapper<CSVRecord> { +public class CsvToKeyValueMapper extends FormatToBytesWritableMapper<CSVRecord> { /** Configuration key for the field delimiter for input csv records */ public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java new file mode 100644 index 0000000..2c9b6d9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.UpsertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Base class for converting some input source format into {@link ImmutableBytesWritable}s that + * contains packed in a single byte array values for all columns. + * Assumes input format is text-based, with one row per line. Depends on an online cluster + * to retrieve {@link ColumnInfo} from the target table. + */ +public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair, + ImmutableBytesWritable> { + + protected static final Logger LOG = LoggerFactory.getLogger(FormatToBytesWritableMapper.class); + + protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import"; + + /** Configuration key for the name of the output table */ + public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename"; + + /** Configuration key for the columns to be imported */ + public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos"; + + /** Configuration key for the flag to ignore invalid rows */ + public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow"; + + /** Configuration key for the table names */ + public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames"; + + /** Configuration key for the table logical names */ + public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames"; + + /** + * Parses a single input line, returning a {@code T}. + */ + public interface LineParser<T> { + T parse(String input) throws IOException; + } + + protected PhoenixConnection conn; + protected UpsertExecutor<RECORD, ?> upsertExecutor; + protected ImportPreUpsertKeyValueProcessor preUpdateProcessor; + protected List<String> tableNames; + protected List<String> logicalNames; + protected MapperUpsertListener<RECORD> upsertListener; + + /* + lookup table for column index. Index in the List matches to the index in tableNames List + */ + protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes; + + protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf); + protected abstract LineParser<RECORD> getLineParser(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + + Configuration conf = context.getConfiguration(); + + // pass client configuration into driver + Properties clientInfos = new Properties(); + for (Map.Entry<String, String> entry : conf) { + clientInfos.setProperty(entry.getKey(), entry.getValue()); + } + + try { + conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); + + final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); + final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY); + tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); + logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); + + columnIndexes = initColumnIndexes(); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + + upsertListener = new MapperUpsertListener<RECORD>( + context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); + upsertExecutor = buildUpsertExecutor(conf); + preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + } + + @SuppressWarnings("deprecation") + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, + InterruptedException { + if (conn == null) { + throw new RuntimeException("Connection not initialized."); + } + try { + RECORD record = null; + try { + record = getLineParser().parse(value.toString()); + } catch (IOException e) { + context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L); + return; + } + + if (record == null) { + context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L); + return; + } + upsertExecutor.execute(ImmutableList.<RECORD>of(record)); + Map<Integer, List<KeyValue>> map = new HashMap<>(); + Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator + = PhoenixRuntime.getUncommittedDataIterator(conn, true); + while (uncommittedDataIterator.hasNext()) { + Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); + List<KeyValue> keyValueList = kvPair.getSecond(); + keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); + byte[] first = kvPair.getFirst(); + // Create a list of KV for each table + for (int i = 0; i < tableNames.size(); i++) { + if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) { + if (!map.containsKey(i)) { + map.put(i, new ArrayList<KeyValue>()); + } + List<KeyValue> list = map.get(i); + for (KeyValue kv : keyValueList) { + list.add(kv); + } + break; + } + } + } + for (Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) { + int tableIndex = rowEntry.getKey(); + List<KeyValue> lkv = rowEntry.getValue(); + // All KV values combines to a single byte array + writeAggregatedRow(context, tableIndex, lkv); + } + conn.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException { + List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>(); + int tableIndex; + for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) { + PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex)); + Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<PColumn> cls = table.getColumns(); + for (int i = 0; i < cls.size(); i++) { + PColumn c = cls.get(i); + if (c.getFamilyName() == null) continue; // Skip PK column + byte[] family = c.getFamilyName().getBytes(); + byte[] name = c.getName().getBytes(); + if (!columnMap.containsKey(family)) { + columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR)); + } + Map<byte[], Integer> qualifier = columnMap.get(family); + qualifier.put(name, i); + } + tableMap.add(columnMap); + } + return tableMap; + } + + /** + * Find the column index which will replace the column name in + * the aggregated array and will be restored in Reducer + * + * @param tableIndex Table index in tableNames list + * @param cell KeyValue for the column + * @return column index for the specified cell or -1 if was not found + */ + private int findIndex(int tableIndex, Cell cell) { + Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex); + Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(), + cell.getFamilyOffset(), cell.getFamilyLength())); + if (qualifiers != null) { + Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())); + if (result != null) { + return result; + } + } + return -1; + } + + /** + * Collect all column values for the same rowKey + * + * @param context Current mapper context + * @param tableIndex Table index in tableNames list + * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable + * @throws IOException + * @throws InterruptedException + */ + + private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv) + throws IOException, InterruptedException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); + DataOutputStream outputStream = new DataOutputStream(bos); + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + if (!lkv.isEmpty()) { + // All Key Values for the same row are supposed to be the same, so init rowKey only once + Cell first = lkv.get(0); + outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength()); + for (KeyValue cell : lkv) { + if (isEmptyCell(cell)) { + continue; + } + int i = findIndex(tableIndex, cell); + if (i == -1) { + throw new IOException("No column found for KeyValue"); + } + WritableUtils.writeVInt(outputStream, i); + outputStream.writeByte(cell.getTypeByte()); + WritableUtils.writeVInt(outputStream, cell.getValueLength()); + outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } + } + ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); + outputStream.close(); + context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray); + } + + protected boolean isEmptyCell(KeyValue cell) { + if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0, + QueryConstants.EMPTY_COLUMN_BYTES.length) != 0) + return false; + else + return true; + } + + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + if (conn != null) { + conn.close(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + /** + * Write the list of to-import columns to a job configuration. + * + * @param conf configuration to be written to + * @param columnInfoList list of ColumnInfo objects to be configured for import + */ + @VisibleForTesting + static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) { + conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList)); + } + + /** + * Build the list of ColumnInfos for the import based on information in the configuration. + */ + @VisibleForTesting + static List<ColumnInfo> buildColumnInfoList(Configuration conf) { + + return Lists.newArrayList( + Iterables.transform( + Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)), + new Function<String, ColumnInfo>() { + @Nullable + @Override + public ColumnInfo apply(@Nullable String input) { + if (input == null || input.isEmpty()) { + // An empty string represents a null that was passed in to + // the configuration, which corresponds to an input column + // which is to be skipped + return null; + } + return ColumnInfo.fromString(input); + } + })); + } + + /** + * Listener that logs successful upserts and errors to job counters. + */ + @VisibleForTesting + static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> { + + private final Mapper<LongWritable, Text, + TableRowkeyPair, ImmutableBytesWritable>.Context context; + private final boolean ignoreRecordErrors; + + private MapperUpsertListener( + Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context, + boolean ignoreRecordErrors) { + this.context = context; + this.ignoreRecordErrors = ignoreRecordErrors; + } + + @Override + public void upsertDone(long upsertCount) { + context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L); + } + + @Override + public void errorOnRecord(T record, Throwable throwable) { + LOG.error("Error on record " + record, throwable); + context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L); + if (!ignoreRecordErrors) { + throw Throwables.propagate(throwable); + } + } + } + + /** + * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no + * specific class is configured. This implementation simply passes through the KeyValue + * list that is passed in. + */ + public static class DefaultImportPreUpsertKeyValueProcessor implements + ImportPreUpsertKeyValueProcessor { + + @Override + public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { + return keyValues; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java deleted file mode 100644 index 8dbb4aa..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.sql.SQLException; -import java.util.*; -import javax.annotation.Nullable; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; -import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -/** - * Base class for converting some input source format into {@link KeyValue}s of a target - * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster - * to retrieve {@link ColumnInfo} from the target table. - */ -public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair, - ImmutableBytesWritable> { - - protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class); - - protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import"; - - /** Configuration key for the name of the output table */ - public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename"; - - /** Configuration key for the name of the output index table */ - public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename"; - - /** Configuration key for the columns to be imported */ - public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos"; - - /** Configuration key for the flag to ignore invalid rows */ - public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow"; - - /** Configuration key for the table names */ - public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames"; - public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames"; - - /** Configuration key for the table configurations */ - public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config"; - - /** - * Parses a single input line, returning a {@code T}. - */ - public interface LineParser<T> { - T parse(String input) throws IOException; - } - - protected PhoenixConnection conn; - protected UpsertExecutor<RECORD, ?> upsertExecutor; - protected ImportPreUpsertKeyValueProcessor preUpdateProcessor; - protected List<String> tableNames; - protected List<String> logicalNames; - protected MapperUpsertListener<RECORD> upsertListener; - - /* - lookup table for column index. Index in the List matches to the index in tableNames List - */ - protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes; - - protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf); - protected abstract LineParser<RECORD> getLineParser(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - - Configuration conf = context.getConfiguration(); - - // pass client configuration into driver - Properties clientInfos = new Properties(); - for (Map.Entry<String, String> entry : conf) { - clientInfos.setProperty(entry.getKey(), entry.getValue()); - } - - try { - conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); - - final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); - final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY); - tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); - logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); - - columnIndexes = initColumnIndexes(); - } catch (SQLException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - - upsertListener = new MapperUpsertListener<RECORD>( - context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); - upsertExecutor = buildUpsertExecutor(conf); - preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - } - - @SuppressWarnings("deprecation") - @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, - InterruptedException { - if (conn == null) { - throw new RuntimeException("Connection not initialized."); - } - try { - RECORD record = null; - try { - record = getLineParser().parse(value.toString()); - } catch (IOException e) { - context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L); - return; - } - - if (record == null) { - context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L); - return; - } - upsertExecutor.execute(ImmutableList.<RECORD>of(record)); - Map<Integer, List<KeyValue>> map = new HashMap<>(); - Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator - = PhoenixRuntime.getUncommittedDataIterator(conn, true); - while (uncommittedDataIterator.hasNext()) { - Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); - List<KeyValue> keyValueList = kvPair.getSecond(); - keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); - byte[] first = kvPair.getFirst(); - // Create a list of KV for each table - for (int i = 0; i < tableNames.size(); i++) { - if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) { - if (!map.containsKey(i)) { - map.put(i, new ArrayList<KeyValue>()); - } - List<KeyValue> list = map.get(i); - for (KeyValue kv : keyValueList) { - list.add(kv); - } - break; - } - } - } - for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) { - int tableIndex = rowEntry.getKey(); - List<KeyValue> lkv = rowEntry.getValue(); - // All KV values combines to a single byte array - writeAggregatedRow(context, tableIndex, lkv); - } - conn.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException { - List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>(); - int tableIndex; - for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) { - PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex)); - Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - List<PColumn> cls = table.getColumns(); - for(int i = 0; i < cls.size(); i++) { - PColumn c = cls.get(i); - if(c.getFamilyName() == null) continue; // Skip PK column - byte[] family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - if(!columnMap.containsKey(family)) { - columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR)); - } - Map<byte[], Integer> qualifier = columnMap.get(family); - qualifier.put(name, i); - } - tableMap.add(columnMap); - } - return tableMap; - } - - /** - * Find the column index which will replace the column name in - * the aggregated array and will be restored in Reducer - * - * @param tableIndex Table index in tableNames list - * @param cell KeyValue for the column - * @return column index for the specified cell or -1 if was not found - */ - private int findIndex(int tableIndex, Cell cell) { - Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex); - Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength())); - if(qualifiers!= null) { - Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength())); - if(result!=null) { - return result; - } - } - return -1; - } - - /** - * Collect all column values for the same rowKey - * - * @param context Current mapper context - * @param tableIndex Table index in tableNames list - * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable - * @throws IOException - * @throws InterruptedException - */ - - private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv) - throws IOException, InterruptedException { - TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024); - DataOutputStream outputStream = new DataOutputStream(bos); - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - if (!lkv.isEmpty()) { - // All Key Values for the same row are supposed to be the same, so init rowKey only once - Cell first = lkv.get(0); - outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength()); - for (KeyValue cell : lkv) { - if(isEmptyCell(cell)) { - continue; - } - int i = findIndex(tableIndex, cell); - if (i == -1) { - throw new IOException("No column found for KeyValue"); - } - WritableUtils.writeVInt(outputStream, i); - outputStream.writeByte(cell.getTypeByte()); - WritableUtils.writeVInt(outputStream, cell.getValueLength()); - outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - } - } - ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); - outputStream.close(); - context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray); - } - - protected boolean isEmptyCell(KeyValue cell) { - if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length) != 0) - return false; - else - return true; - } - - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - if (conn != null) { - conn.close(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * Write the list of to-import columns to a job configuration. - * - * @param conf configuration to be written to - * @param columnInfoList list of ColumnInfo objects to be configured for import - */ - @VisibleForTesting - static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) { - conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList)); - } - - /** - * Build the list of ColumnInfos for the import based on information in the configuration. - */ - @VisibleForTesting - static List<ColumnInfo> buildColumnInfoList(Configuration conf) { - - return Lists.newArrayList( - Iterables.transform( - Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)), - new Function<String, ColumnInfo>() { - @Nullable - @Override - public ColumnInfo apply(@Nullable String input) { - if (input == null || input.isEmpty()) { - // An empty string represents a null that was passed in to - // the configuration, which corresponds to an input column - // which is to be skipped - return null; - } - return ColumnInfo.fromString(input); - } - })); - } - - /** - * Listener that logs successful upserts and errors to job counters. - */ - @VisibleForTesting - static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> { - - private final Mapper<LongWritable, Text, - TableRowkeyPair, ImmutableBytesWritable>.Context context; - private final boolean ignoreRecordErrors; - - private MapperUpsertListener( - Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context, - boolean ignoreRecordErrors) { - this.context = context; - this.ignoreRecordErrors = ignoreRecordErrors; - } - - @Override - public void upsertDone(long upsertCount) { - context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L); - } - - @Override - public void errorOnRecord(T record, Throwable throwable) { - LOG.error("Error on record " + record, throwable); - context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L); - if (!ignoreRecordErrors) { - throw Throwables.propagate(throwable); - } - } - } - - /** - * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no - * specific class is configured. This implementation simply passes through the KeyValue - * list that is passed in. - */ - public static class DefaultImportPreUpsertKeyValueProcessor implements - ImportPreUpsertKeyValueProcessor { - - @Override - public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { - return keyValues; - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index fb61855..e906431 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -21,7 +21,11 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; @@ -38,7 +42,11 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.*; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +55,7 @@ import org.slf4j.LoggerFactory; * Performs similar functionality to {@link KeyValueSortReducer} */ public class FormatToKeyValueReducer - extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> { + extends Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue> { protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class); @@ -72,8 +80,8 @@ public class FormatToKeyValueReducer try { PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); builder = conn.getKeyValueBuilder(); - final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY); - final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY); + final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY); + final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY); tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); @@ -91,9 +99,9 @@ public class FormatToKeyValueReducer emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table)); List<PColumn> cls = table.getColumns(); List<Pair<byte[], byte[]>> list = new ArrayList(cls.size()); - for(int i = 0; i < cls.size(); i++) { + for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); - if(c.getFamilyName() == null) { + if (c.getFamilyName() == null) { list.add(null); // Skip PK column continue; } @@ -108,14 +116,14 @@ public class FormatToKeyValueReducer @Override protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values, - Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) - throws IOException, InterruptedException { + Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) + throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); int tableIndex = tableNames.indexOf(key.getTableName()); List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex); for (ImmutableBytesWritable aggregatedArray : values) { DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); - while (input.available()!= 0) { + while (input.available() != 0) { int index = WritableUtils.readVInt(input); Pair<byte[], byte[]> pair = columns.get(index); byte type = input.readByte(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java index 5173a0e..2a51b2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java @@ -37,7 +37,7 @@ import com.google.common.base.Preconditions; * extracting the created KeyValues and rolling back the statement execution before it is * committed to HBase. */ -public class JsonToKeyValueMapper extends FormatToKeyValueMapper<Map<?, ?>> { +public class JsonToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> { private LineParser<Map<?, ?>> lineParser; http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java index e02065f..58725c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java @@ -78,7 +78,7 @@ public class TargetTableRefFunctions { } }; - public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() { + public static Function<List<TargetTableRef>,String> LOGICAL_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() { @Override public String apply(List<TargetTableRef> input) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 280daa2..b1879d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.mapreduce.FormatToKeyValueMapper; +import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixInputFormat; import org.apache.phoenix.util.ColumnInfo; @@ -418,7 +418,7 @@ public final class PhoenixConfigurationUtil { Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null; try { processorClass = conf.getClass( - UPSERT_HOOK_CLASS_CONFKEY, FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, + UPSERT_HOOK_CLASS_CONFKEY, FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class, ImportPreUpsertKeyValueProcessor.class); } catch (Exception e) { throw new IllegalStateException("Couldn't load upsert hook class", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java new file mode 100644 index 0000000..6424976 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PIntegerArray; +import org.apache.phoenix.schema.types.PUnsignedInt; +import org.apache.phoenix.util.ColumnInfo; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import static org.junit.Assert.assertEquals; + +public class FormatToBytesWritableMapperTest { + + @Test + public void testBuildColumnInfoList() { + List<ColumnInfo> columnInfoList = ImmutableList.of( + new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), + new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), + new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); + + Configuration conf = new Configuration(); + FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoList); + List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf); + + assertEquals(columnInfoList, fromConfig); + } + + @Test + public void testBuildColumnInfoList_ContainingNulls() { + // A null value in the column info list means "skip that column in the input" + List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList( + new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), + null, + new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), + new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); + + Configuration conf = new Configuration(); + FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoListWithNull); + List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf); + + assertEquals(columnInfoListWithNull, fromConfig); + } + + @Test + public void testLoadPreUpdateProcessor() { + Configuration conf = new Configuration(); + conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, + ImportPreUpsertKeyValueProcessor.class); + + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + assertEquals(MockUpsertProcessor.class, processor.getClass()); + } + + @Test + public void testLoadPreUpdateProcessor_NotConfigured() { + + Configuration conf = new Configuration(); + ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + + assertEquals(FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class, + processor.getClass()); + } + + @Test(expected=IllegalStateException.class) + public void testLoadPreUpdateProcessor_ClassNotFound() { + Configuration conf = new Configuration(); + conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); + + PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); + } + + static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor { + @Override + public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { + throw new UnsupportedOperationException("Not yet implemented"); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java deleted file mode 100644 index 3455616..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.mapreduce; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PIntegerArray; -import org.apache.phoenix.schema.types.PUnsignedInt; -import org.apache.phoenix.util.ColumnInfo; -import org.junit.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import static org.junit.Assert.assertEquals; - -public class FormatToKeyValueMapperTest { - - @Test - public void testBuildColumnInfoList() { - List<ColumnInfo> columnInfoList = ImmutableList.of( - new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), - new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), - new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); - - Configuration conf = new Configuration(); - FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoList); - List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf); - - assertEquals(columnInfoList, fromConfig); - } - - @Test - public void testBuildColumnInfoList_ContainingNulls() { - // A null value in the column info list means "skip that column in the input" - List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList( - new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()), - null, - new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()), - new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType())); - - Configuration conf = new Configuration(); - FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull); - List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf); - - assertEquals(columnInfoListWithNull, fromConfig); - } - - @Test - public void testLoadPreUpdateProcessor() { - Configuration conf = new Configuration(); - conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class, - ImportPreUpsertKeyValueProcessor.class); - - ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - assertEquals(MockUpsertProcessor.class, processor.getClass()); - } - - @Test - public void testLoadPreUpdateProcessor_NotConfigured() { - - Configuration conf = new Configuration(); - ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - - assertEquals(FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class, - processor.getClass()); - } - - @Test(expected=IllegalStateException.class) - public void testLoadPreUpdateProcessor_ClassNotFound() { - Configuration conf = new Configuration(); - conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass"); - - PhoenixConfigurationUtil.loadPreUpsertProcessor(conf); - } - - static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor { - @Override - public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) { - throw new UnsupportedOperationException("Not yet implemented"); - } - } -}