[CARBONDATA-1827] S3 Carbon Implementation 1.Provide support for s3 in carbondata. 2.Added S3Example to create carbon table on s3. 3.Added S3CSVExample to load carbon table using csv from s3.
This closes #1805 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/65679b8e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/65679b8e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/65679b8e Branch: refs/heads/carbonstore-rebase Commit: 65679b8e1d7f5e4eb77b512f9dd79c369eab554b Parents: b3f9f84 Author: SangeetaGulia <sangeeta.gu...@knoldus.in> Authored: Thu Sep 21 14:56:26 2017 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sat Feb 10 02:20:09 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 21 +++ .../filesystem/AbstractDFSCarbonFile.java | 20 ++- .../datastore/filesystem/HDFSCarbonFile.java | 5 +- .../core/datastore/impl/FileFactory.java | 11 +- .../core/locks/CarbonLockFactory.java | 28 ++-- .../carbondata/core/locks/S3FileLock.java | 111 +++++++++++++ .../carbondata/core/util/CarbonProperties.java | 3 +- .../filesystem/HDFSCarbonFileTest.java | 8 +- examples/spark2/pom.xml | 5 + examples/spark2/src/main/resources/data1.csv | 11 ++ .../carbondata/examples/S3CsvExample.scala | 99 +++++++++++ .../apache/carbondata/examples/S3Example.scala | 164 +++++++++++++++++++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 42 ++++- integration/spark2/pom.xml | 43 +++++ .../spark/rdd/CarbonDataRDDFactory.scala | 3 +- .../org/apache/spark/sql/CarbonSession.scala | 3 + 16 files changed, 554 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 6e6482d..2e169c2 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -167,6 +167,22 @@ public final class CarbonCommonConstants { public static final String S3N_PREFIX = "s3n://"; public static final String S3A_PREFIX = "s3a://"; + /** + * Access Key for s3n + */ + public static final String S3N_ACCESS_KEY = "fs.s3n.awsAccessKeyId"; + /** + * Secret Key for s3n + */ + public static final String S3N_SECRET_KEY = "fs.s3n.awsSecretAccessKey"; + /** + * Access Key for s3 + */ + public static final String S3_ACCESS_KEY = "fs.s3.awsAccessKeyId"; + /** + * Secret Key for s3 + */ + public static final String S3_SECRET_KEY = "fs.s3.awsSecretAccessKey"; /** * FS_DEFAULT_FS @@ -941,6 +957,11 @@ public final class CarbonCommonConstants { public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK"; /** + * S3LOCK TYPE + */ + public static final String CARBON_LOCK_TYPE_S3 = "S3LOCK"; + + /** * Invalid filter member log string */ public static final String FILTER_INVALID_MEMBER = http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 7b634d2..e1a34fa 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -50,7 +50,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; -public abstract class AbstractDFSCarbonFile implements CarbonFile { +public abstract class AbstractDFSCarbonFile implements CarbonFile { /** * LOGGER */ @@ -261,18 +261,28 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize, boolean append) throws IOException { Path pt = new Path(path); - FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); + FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration()); FSDataOutputStream stream = null; if (append) { // append to a file only if file already exists else file not found // exception will be thrown by hdfs if (CarbonUtil.isFileExists(path)) { - stream = fs.append(pt, bufferSize); + if (FileFactory.FileType.S3 == fileType) { + DataInputStream dataInputStream = fileSystem.open(pt); + int count = dataInputStream.available(); + // create buffer + byte[] byteStreamBuffer = new byte[count]; + dataInputStream.read(byteStreamBuffer); + stream = fileSystem.create(pt, true, bufferSize); + stream.write(byteStreamBuffer); + } else { + stream = fileSystem.append(pt, bufferSize); + } } else { - stream = fs.create(pt, true, bufferSize); + stream = fileSystem.create(pt, true, bufferSize); } } else { - stream = fs.create(pt, true, bufferSize); + stream = fileSystem.create(pt, true, bufferSize); } return stream; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java index d470b47..892a556 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java @@ -107,8 +107,11 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile { ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName), org.apache.hadoop.fs.Options.Rename.OVERWRITE); return true; + } else if (fileStatus.getPath().toString().startsWith("s3n")) { + fs.delete(new Path(changetoName), true); + return fs.rename(fileStatus.getPath(), new Path(changetoName)); } else { - return false; + return fs.rename(fileStatus.getPath(), new Path(changetoName)); } } catch (IOException e) { LOGGER.error("Exception occured: " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index e6fbd04..daf6d93 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -246,7 +246,15 @@ public final class FileFactory { */ public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType) throws IOException { - return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType); + if (FileType.S3 == fileType) { + CarbonFile carbonFile = getCarbonFile(path); + if (carbonFile.exists()) { + carbonFile.delete(); + } + return carbonFile.getDataOutputStream(path,fileType); + } else { + return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType); + } } /** @@ -423,6 +431,7 @@ public final class FileFactory { throws IOException { FileFactory.FileType fileType = FileFactory.getFileType(directoryPath); switch (fileType) { + case S3: case HDFS: case VIEWFS: try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java index e70e655..3226a63 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java @@ -52,18 +52,21 @@ public class CarbonLockFactory { */ public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier, String lockFile) { - switch (lockTypeConfigured) { - case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL: - return new LocalFileLock(absoluteTableIdentifier, lockFile); - case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER: - return new ZooKeeperLocking(absoluteTableIdentifier, lockFile); - - case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: - return new HdfsFileLock(absoluteTableIdentifier, lockFile); - - default: - throw new UnsupportedOperationException("Not supported the lock type"); + String tablePath = absoluteTableIdentifier.getTablePath(); + if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) { + return new ZooKeeperLocking(absoluteTableIdentifier, lockFile); + } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) || + tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) || + tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) { + lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3; + return new S3FileLock(absoluteTableIdentifier, lockFile); + } else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { + lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS; + return new HdfsFileLock(absoluteTableIdentifier, lockFile); + } else { + lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL; + return new LocalFileLock(absoluteTableIdentifier, lockFile); } } @@ -84,6 +87,9 @@ public class CarbonLockFactory { case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS: return new HdfsFileLock(locFileLocation, lockFile); + case CarbonCommonConstants.CARBON_LOCK_TYPE_S3: + return new S3FileLock(locFileLocation, lockFile); + default: throw new UnsupportedOperationException("Not supported the lock type"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java new file mode 100644 index 0000000..8836960 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.locks; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +/** + * This class is used to handle the S3 File locking. + * This is acheived using the concept of acquiring the data out stream using Append option. + */ +public class S3FileLock extends AbstractCarbonLock { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(S3FileLock.class.getName()); + /** + * location s3 file location + */ + private String location; + + private DataOutputStream dataOutputStream; + + /** + * @param tableIdentifier + * @param lockFile + */ + public S3FileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) { + this(tableIdentifier.getTablePath(), lockFile); + } + + /** + * @param lockFileLocation + * @param lockFile + */ + public S3FileLock(String lockFileLocation, String lockFile) { + this.location = lockFileLocation + CarbonCommonConstants.FILE_SEPARATOR + lockFile; + LOGGER.info("S3 lock path:" + this.location); + initRetry(); + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#unlock() + */ + @Override public boolean unlock() { + boolean status = false; + if (null != dataOutputStream) { + try { + dataOutputStream.close(); + status = true; + } catch (IOException e) { + status = false; + } finally { + CarbonFile carbonFile = + FileFactory.getCarbonFile(location, FileFactory.getFileType(location)); + if (carbonFile.exists()) { + if (carbonFile.delete()) { + LOGGER.info("Deleted the lock file " + location); + } else { + LOGGER.error("Not able to delete the lock file " + location); + status = false; + } + } else { + LOGGER.error( + "Not able to delete the lock file because it is not existed in location " + location); + status = false; + } + } + } + return status; + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#lock() + */ + @Override public boolean lock() { + try { + if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { + FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); + } + dataOutputStream = + FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location)); + return true; + } catch (IOException e) { + LOGGER.error(e, e.getMessage()); + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 3dc7b8f..9d52669 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -369,7 +369,8 @@ public final class CarbonProperties { String defaultFs = configuration.get("fs.defaultFS"); if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs - .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) + .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX) || defaultFs + .startsWith(CarbonCommonConstants.S3A_PREFIX)) && !CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS.equalsIgnoreCase(lockTypeConfigured)) { LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key " + LOCK_TYPE + " is invalid for current file system. " http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java index 7726693..4018123 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java @@ -369,7 +369,13 @@ public class HDFSCarbonFileTest { } }; - assertEquals(hdfsCarbonFile.renameForce(fileName), false); + new MockUp<WebHdfsFileSystem>(){ + @Mock + public boolean rename(final Path src, final Path dst) throws IOException { + return true; + } + }; + assertEquals(hdfsCarbonFile.renameForce(fileName), true); } @Test http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/examples/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index c17f0ee..f64dc9f 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -62,6 +62,11 @@ <version>${spark.version}</version> <scope>${spark.deps.scope}</scope> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.2</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/examples/spark2/src/main/resources/data1.csv ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/data1.csv b/examples/spark2/src/main/resources/data1.csv new file mode 100644 index 0000000..cf732eb --- /dev/null +++ b/examples/spark2/src/main/resources/data1.csv @@ -0,0 +1,11 @@ +shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField +1,10,1100,48.4,spark,2015-4-23 12:01:01,1.23,2015-4-23,aaa,2.5 +5,17,1140,43.4,spark,2015-7-27 12:01:02,3.45,2015-7-27,bbb,2.5 +1,11,1100,44.4,flink,2015-5-23 12:01:03,23.23,2015-5-23,ccc,2.5 +1,10,1150,43.4,spark,2015-7-24 12:01:04,254.12,2015-7-24,ddd,2.5 +1,10,1100,47.4,spark,2015-7-23 12:01:05,876.14,2015-7-23,eeee,3.5 +3,14,1160,43.4,hive,2015-7-26 12:01:06,3454.32,2015-7-26,ff,2.5 +2,10,1100,43.4,impala,2015-7-23 12:01:07,456.98,2015-7-23,ggg,2.5 +1,10,1100,43.4,spark,2015-5-23 12:01:08,32.53,2015-5-23,hhh,2.5 +4,16,1130,42.4,impala,2015-7-23 12:01:09,67.23,2015-7-23,iii,2.5 +1,10,1100,43.4,spark,2015-7-23 12:01:10,832.23,2015-7-23,jjj,2.5 http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala new file mode 100644 index 0000000..b37fba8 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3CsvExample.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, SECRET_KEY} +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +object S3CsvExample { + + /** + * This example demonstrate to create local store and load data from CSV files on S3 + * + * @param args require three parameters "Access-key" "Secret-key" + * "s3 path to csv" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length != 4) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<s3.csv.location> <spark-master>") + System.exit(0) + } + + val spark = SparkSession + .builder() + .master(args(3)) + .appName("S3CsvExample") + .config("spark.driver.host", "localhost") + .config("spark.hadoop." + ACCESS_KEY, args(0)) + .config("spark.hadoop." + SECRET_KEY, args(1)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("INFO") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table1( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '$rootPath/examples/spark2/target/store' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '${ args(2) }' + | INTO TABLE carbon_table1 + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '${ args(2) }' + | INTO TABLE carbon_table1 + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table1 + """.stripMargin).show() + + spark.sql("Drop table if exists carbon_table1") + + spark.stop() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala new file mode 100644 index 0000000..d3d0a37 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} +import org.apache.spark.sql.{Row, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object S3Example { + + /** + * This example demonstrate usage of + * 1. create carbon table with storage location on object based storage + * like AWS S3, Huawei OBS, etc + * 2. load data into carbon table, the generated file will be stored on object based storage + * query the table. + * + * @param args require three parameters "Access-key" "Secret-key" + * "table-path on s3" "s3-endpoint" "spark-master" + */ + def main(args: Array[String]) { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv" + val logger: Logger = LoggerFactory.getLogger(this.getClass) + + import org.apache.spark.sql.CarbonSession._ + if (args.length < 3 || args.length > 5) { + logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" + + "<table-path-on-s3> [s3-endpoint] [spark-master]") + System.exit(0) + } + + val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2)) + val spark = SparkSession + .builder() + .master(getSparkMaster(args)) + .appName("S3Example") + .config("spark.driver.host", "localhost") + .config(accessKey, args(0)) + .config(secretKey, args(1)) + .config(endpoint, getS3EndPoint(args)) + .getOrCreateCarbonSession() + + spark.sparkContext.setLogLevel("WARN") + + spark.sql("Drop table if exists carbon_table") + + spark.sql( + s""" + | CREATE TABLE if not exists carbon_table( + | shortField SHORT, + | intField INT, + | bigintField LONG, + | doubleField DOUBLE, + | stringField STRING, + | timestampField TIMESTAMP, + | decimalField DECIMAL(18,2), + | dateField DATE, + | charField CHAR(5), + | floatField FLOAT + | ) + | STORED BY 'carbondata' + | LOCATION '${ args(2) }' + | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin) + + val countSegment: Array[Row] = + spark.sql( + s""" + | SHOW SEGMENTS FOR TABLE carbon_table + """.stripMargin).collect() + + while (countSegment.length != 3) { + this.wait(2000) + } + + // Use compaction command to merge segments or small files in object based storage, + // this can be done periodically. + spark.sql("ALTER table carbon_table compact 'MAJOR'") + spark.sql("show segments for table carbon_table").show() + + spark.sql( + s""" + | SELECT * + | FROM carbon_table + """.stripMargin).show() + + spark.sql("Drop table if exists carbon_table") + + spark.stop() + } + + def getKeyOnPrefix(path: String): (String, String, String) = { + val endPoint = "spark.hadoop." + ENDPOINT + if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint) + } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) { + ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY, + "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint) + } else { + throw new Exception("Incorrect Store Path") + } + } + + def getS3EndPoint(args: Array[String]): String = { + if (args.length >= 4 && args(3).contains(".com")) args(3) + else "" + } + + def getSparkMaster(args: Array[String]): String = { + if (args.length == 5) args(4) + else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3) + else "local" + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 1fa1689..917fc88 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -346,11 +347,31 @@ class NewDataFrameLoaderRDD[K, V]( sc: SparkContext, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) { + prev: DataLoadCoalescedRDD[Row], + @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) { - override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { + private val confBytes = { + val bao = new ByteArrayOutputStream() + val oos = new ObjectOutputStream(bao) + hadoopConf.write(oos) + oos.close() + CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray) + } + private def getConf = { + val configuration = new Configuration(false) + val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor + .unCompressByte(confBytes)) + val ois = new ObjectInputStream(bai) + configuration.readFields(ois) + ois.close() + configuration + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val hadoopConf = getConf + setS3Configurations(hadoopConf) val iter = new Iterator[(K, V)] { val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") @@ -420,6 +441,23 @@ class NewDataFrameLoaderRDD[K, V]( iter } override protected def getPartitions: Array[Partition] = firstParent[Row].partitions + + private def setS3Configurations(hadoopConf: Configuration): Unit = { + FileFactory.getConfiguration + .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", "")) + FileFactory.getConfiguration + .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", "")) + FileFactory.getConfiguration + .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", "")) + FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY, + hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, "")) + FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY, + hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, "")) + FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY, + hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, "")) + FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY, + hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, "")) + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index b68a55d..aac1ff6 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -57,6 +57,49 @@ <version>2.2.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + <version>1.7.4</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + <version>0.9.0</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 3980c11..833318d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -995,7 +995,8 @@ object CarbonDataRDDFactory { sqlContext.sparkContext, new DataLoadResultImpl(), carbonLoadModel, - newRdd + newRdd, + sqlContext.sparkContext.hadoopConfiguration ).collect() } catch { case ex: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/65679b8e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index e95b8db..ded8f35 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -21,6 +21,7 @@ import java.io.File import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder @@ -30,6 +31,7 @@ import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.util.{CarbonReflectionUtils, Utils} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} /** @@ -152,6 +154,7 @@ object CarbonSession { sparkConf.setAppName(randomAppName) } val sc = SparkContext.getOrCreate(sparkConf) + setS3Configurations(sc) // maybe this is an existing SparkContext, update its SparkConf which maybe used // by SparkSession options.foreach { case (k, v) => sc.conf.set(k, v) }