This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 0ed534f4cf PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location 0ed534f4cf is described below commit 0ed534f4cfefb059f5c8633f0db9c4a188ba97df Author: Sergey Soldatov <s...@.apache.org> AuthorDate: Tue May 31 13:37:20 2022 -0700 PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location Co-authored-by: Istvan Toth <st...@apache.org> --- .../phoenix/mapreduce/MultiHfileOutputFormat.java | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java index 3b2d4c47bf..b792958b7a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java @@ -68,10 +68,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.phoenix.compat.hbase.CompatUtil; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; @@ -114,7 +115,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce @Override public RecordWriter<TableRowkeyPair, Cell> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return createRecordWriter(context); + return createRecordWriter(context, this.getOutputCommitter(context)); } /** @@ -123,11 +124,11 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce * @return * @throws IOException */ - static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(final TaskAttemptContext context) + static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter( + final TaskAttemptContext context, final OutputCommitter committer) throws IOException { // Get the path of the temporary output file - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Path outputdir = ((PathOutputCommitter) committer).getWorkPath(); final Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); @@ -336,7 +337,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf,final String tableName) { Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],Algorithm>(Bytes.BYTES_COMPARATOR); Map<String, String> tableConfigs = getTableConfigurations(conf, tableName); - if(tableConfigs == null) { + if (tableConfigs == null) { return compressionMap; } Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,COMPRESSION_FAMILIES_CONF_KEY); @@ -355,7 +356,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce */ private static Map<String, String> getTableConfigurations(Configuration conf, final String tableName) { String tableDefn = conf.get(tableName); - if(StringUtils.isEmpty(tableDefn)) { + if (StringUtils.isEmpty(tableDefn)) { return null; } TargetTableRef table = TargetTableRefFunctions.FROM_JSON.apply(tableDefn); @@ -374,7 +375,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf,final String tableName) { Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],BloomType>(Bytes.BYTES_COMPARATOR); Map<String, String> tableConfigs = getTableConfigurations(conf, tableName); - if(tableConfigs == null) { + if (tableConfigs == null) { return bloomTypeMap; } Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOOM_TYPE_FAMILIES_CONF_KEY); @@ -396,7 +397,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf,final String tableName) { Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],Integer>(Bytes.BYTES_COMPARATOR); Map<String, String> tableConfigs = getTableConfigurations(conf, tableName); - if(tableConfigs == null) { + if (tableConfigs == null) { return blockSizeMap; } Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOCK_SIZE_FAMILIES_CONF_KEY); @@ -420,7 +421,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],DataBlockEncoding>(Bytes.BYTES_COMPARATOR); Map<String, String> tableConfigs = getTableConfigurations(conf, tableName); - if(tableConfigs == null) { + if (tableConfigs == null) { return encoderMap; } Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,DATABLOCK_ENCODING_FAMILIES_CONF_KEY); @@ -441,7 +442,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce private static Map<byte[], String> createFamilyConfValueMap(Map<String,String> configs, String confName) { Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR); String confVal = configs.get(confName); - if(StringUtils.isEmpty(confVal)) { + if (StringUtils.isEmpty(confVal)) { return confValMap; } for (String familyConf : confVal.split("&")) { @@ -677,7 +678,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce // tableStartKeys for all tables. Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet(); - for(TargetTableRef table : tablesToBeLoaded) { + for (TargetTableRef table : tablesToBeLoaded) { final String tableName = table.getPhysicalName(); try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){ Set<TableRowkeyPair> startKeys =