Repository: carbondata Updated Branches: refs/heads/master 45951c763 -> c0ba982e6
[CARBONDATA-3221] Fix the error of SDK don't support read multiple file from S3 SDK reader is ok with filter, but when we read data without filter, the ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() will be 0 and FileReader reader don't closed after readByteBuffer in org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader#initialize, so we should invoke finish after readByteBuffer This closes #3051 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c0ba982e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c0ba982e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c0ba982e Branch: refs/heads/master Commit: c0ba982e61ba5393168f8664b87ee27cd249d1ca Parents: 45951c7 Author: xubo245 <xub...@huawei.com> Authored: Fri Jan 4 16:53:48 2019 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jan 8 12:58:56 2019 +0530 ---------------------------------------------------------------------- .../carbondata/examples/sdk/SDKS3Example.java | 55 +++++++++++++++----- .../util/CarbonVectorizedRecordReader.java | 1 + 2 files changed, 43 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0ba982e/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java index f9eae9e..33642bf 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -33,6 +33,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; import org.apache.log4j.Logger; +import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY; + /** * Example for testing CarbonWriter on S3 */ @@ -41,7 +45,7 @@ public class SDKS3Example { Logger logger = LogServiceFactory.getLogService(SDKS3Example.class.getName()); if (args == null || args.length < 3) { logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>" - + "<s3-endpoint> [table-path-on-s3] [rows]"); + + "<s3-endpoint> [table-path-on-s3] [rows] [Number of writes]"); System.exit(0); } @@ -56,9 +60,13 @@ public class SDKS3Example { path=args[3]; } - int num = 3; + int rows = 3; if (args.length > 4) { - num = Integer.parseInt(args[4]); + rows = Integer.parseInt(args[4]); + } + int num = 3; + if (args.length > 5) { + num = Integer.parseInt(args[5]); } Configuration conf = new Configuration(true); @@ -69,18 +77,20 @@ public class SDKS3Example { Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - CarbonWriter writer = CarbonWriter - .builder() - .outputPath(path) - .withHadoopConf(conf) - .withCsvInput(new Schema(fields)) - .writtenBy("SDKS3Example") - .build(); + for (int j = 0; j < num; j++) { + CarbonWriter writer = CarbonWriter + .builder() + .outputPath(path) + .withHadoopConf(conf) + .withCsvInput(new Schema(fields)) + .writtenBy("SDKS3Example") + .build(); - for (int i = 0; i < num; i++) { - writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); + for (int i = 0; i < rows; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); + } + writer.close(); } - writer.close(); // Read data EqualToExpression equalToExpression = new EqualToExpression( @@ -104,6 +114,25 @@ public class SDKS3Example { System.out.println("\nFinished"); reader.close(); + // Read without filter + CarbonReader reader2 = CarbonReader + .builder(path, "_temp") + .projection(new String[]{"name", "age"}) + .withHadoopConf(ACCESS_KEY, args[0]) + .withHadoopConf(SECRET_KEY, args[1]) + .withHadoopConf(ENDPOINT, args[2]) + .build(); + + System.out.println("\nData:"); + i = 0; + while (i < 20 && reader2.hasNext()) { + Object[] row = (Object[]) reader2.readNextRow(); + System.out.println(row[0] + " " + row[1]); + i++; + } + System.out.println("\nFinished"); + reader2.close(); + CarbonProperties.getInstance() .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, backupProperty); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0ba982e/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java index d66bdd1..e18a4d4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java @@ -90,6 +90,7 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { ((CarbonInputSplit) inputSplit).getDetailInfo().getBlockSize() - 8, 8); ((CarbonInputSplit) inputSplit).getDetailInfo().setBlockFooterOffset(buffer.getLong()); + reader.finish(); } splitList = new ArrayList<>(1); splitList.add((CarbonInputSplit) inputSplit);