This is an automated email from the ASF dual-hosted git repository. elserj pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 20bc741 PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool 20bc741 is described below commit 20bc74145762d2b19e80b609bec901489accd5cb Author: Josh Elser <els...@apache.org> AuthorDate: Fri Feb 8 11:13:01 2019 -0500 PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool --- .../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 33 ++++++++ .../phoenix/mapreduce/AbstractBulkLoadTool.java | 9 +- .../phoenix/mapreduce/PhoenixTextInputFormat.java | 97 ++++++++++++++++++++++ 3 files changed, 137 insertions(+), 2 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 7e4226d..699b469 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -497,4 +497,37 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { stmt.close(); } + + @Test + public void testIgnoreCsvHeader() throws Exception { + try (Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE S.TABLE13 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)"); + + final Configuration conf = new Configuration(getUtility().getConfiguration()); + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input13.csv")); + try (PrintWriter printWriter = new PrintWriter(outputStream)) { + printWriter.println("id,name"); + printWriter.println("1,Name 1"); + printWriter.println("2,Name 2"); + printWriter.println("3,Name 3"); + } + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(conf); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input13.csv", + "--table", "table13", + "--schema", "s", + "--zookeeper", zkQuorum, + "--skip-header"}); + assertEquals(0, exitCode); + + try (ResultSet rs = stmt.executeQuery("SELECT COUNT(1) FROM S.TABLE13")) { + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertFalse(rs.next()); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index 13c7ab6..e321361 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -88,6 +87,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported"); static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit"); + static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)"); /** * Set configuration values based on parsed command line options. @@ -111,6 +111,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { options.addOption(IMPORT_COLUMNS_OPT); options.addOption(IGNORE_ERRORS_OPT); options.addOption(HELP_OPT); + options.addOption(SKIP_HEADER_OPT); return options; } @@ -202,6 +203,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { conf.set(entry.getKey(), entry.getValue()); } } + // Skip the first line of the CSV file(s)? + if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) { + PhoenixTextInputFormat.setSkipHeader(conf); + } final Connection conn = QueryUtil.getConnection(conf); if (LOG.isDebugEnabled()) { @@ -279,7 +284,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { FileInputFormat.addInputPaths(job, inputPaths); FileOutputFormat.setOutputPath(job, outputPath); - job.setInputFormatClass(TextInputFormat.class); + job.setInputFormatClass(PhoenixTextInputFormat.class); job.setMapOutputKeyClass(TableRowkeyPair.class); job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setOutputKeyClass(TableRowkeyPair.class); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java new file mode 100644 index 0000000..cc170f5 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around TextInputFormat which can ignore the first line in the first InputSplit + * for a file. + */ +public class PhoenixTextInputFormat extends TextInputFormat { + public static final String SKIP_HEADER_KEY = "phoenix.input.format.skip.header"; + + public static void setSkipHeader(Configuration conf) { + conf.setBoolean(SKIP_HEADER_KEY, true); + } + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { + RecordReader<LongWritable,Text> rr = super.createRecordReader(split, context); + + return new PhoenixLineRecordReader((LineRecordReader) rr); + } + + public static class PhoenixLineRecordReader extends RecordReader<LongWritable,Text> { + private static final Logger LOG = LoggerFactory.getLogger(PhoenixLineRecordReader.class); + private final LineRecordReader rr; + private PhoenixLineRecordReader(LineRecordReader rr) { + this.rr = rr; + } + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { + rr.initialize(genericSplit, context); + final Configuration conf = context.getConfiguration(); + final FileSplit split = (FileSplit) genericSplit; + if (conf.getBoolean(SKIP_HEADER_KEY, false) && split.getStart() == 0) { + LOG.trace("Consuming first key-value from {}", genericSplit); + nextKeyValue(); + } else { + LOG.trace("Not configured to skip header or not the first input split: {}", split); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return rr.nextKeyValue(); + } + + @Override + public LongWritable getCurrentKey() throws IOException { + return rr.getCurrentKey(); + } + + @Override + public Text getCurrentValue() throws IOException { + return rr.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException { + return rr.getProgress(); + } + + @Override + public void close() throws IOException { + rr.close(); + } + } +}