This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 63690ce [feature](spark load) support s3 (#266)
63690ce is described below
commit 63690ce3c3cdb39ae201a9be6076e58dbda406b5
Author: gnehil <[email protected]>
AuthorDate: Wed Feb 19 15:49:45 2025 +0800
[feature](spark load) support s3 (#266)
---
spark-load/pom.xml | 2 +-
.../java/org/apache/doris/common/DppResult.java | 54 ++++++++++++---------
.../java/org/apache/doris/SparkLoadRunner.java | 55 ++++++++++++++++++++++
.../java/org/apache/doris/common/Constants.java | 6 +++
.../{Constants.java => enums/StorageType.java} | 15 ++----
.../java/org/apache/doris/config/JobConfig.java | 47 +++++++++---------
.../java/org/apache/doris/load/job/PullLoader.java | 28 ++++++++++-
spark-load/spark-load-dpp/pom.xml | 24 +---------
.../apache/doris/load/loadv2/etl/SparkEtlJob.java | 2 +-
9 files changed, 151 insertions(+), 82 deletions(-)
diff --git a/spark-load/pom.xml b/spark-load/pom.xml
index 9a3279f..6b044b3 100644
--- a/spark-load/pom.xml
+++ b/spark-load/pom.xml
@@ -40,7 +40,7 @@
<revision>25.0.0-SNAPSHOT</revision>
<commons-codec.version>1.13</commons-codec.version>
<commons-lang3.version>3.9</commons-lang3.version>
- <hadoop.version>3.3.6</hadoop.version>
+ <hadoop.version>3.3.4</hadoop.version>
<netty-all.version>4.1.104.Final</netty-all.version>
<parquet.version>1.13.1</parquet.version>
<commons-collections.version>3.2.2</commons-collections.version>
diff --git
a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
index 7a2a9cb..f839c70 100644
---
a/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
+++
b/spark-load/spark-load-common/src/main/java/org/apache/doris/common/DppResult.java
@@ -27,25 +27,35 @@ import java.io.Serializable;
*/
public class DppResult implements Serializable {
+ @JsonProperty(value = "is_success", required = true)
public boolean isSuccess;
+ @JsonProperty(value = "failed_reason", required = true)
public String failedReason;
+ @JsonProperty(value = "scanned_rows", required = true)
public long scannedRows;
+ @JsonProperty(value = "file_number", required = true)
public long fileNumber;
+ @JsonProperty(value = "file_size", required = true)
public long fileSize;
+ @JsonProperty(value = "normal_rows", required = true)
public long normalRows;
+ @JsonProperty(value = "abnormal_rows", required = true)
public long abnormalRows;
+ @JsonProperty(value = "unselect_rows", required = true)
public long unselectRows;
// only part of abnormal rows will be returned
+ @JsonProperty("partial_abnormal_rows")
public String partialAbnormalRows;
+ @JsonProperty("scanned_bytes")
public long scannedBytes;
public DppResult() {
@@ -61,27 +71,27 @@ public class DppResult implements Serializable {
scannedBytes = 0;
}
- @JsonCreator
- public DppResult(@JsonProperty(value = "is_success", required = true)
boolean isSuccess,
- @JsonProperty(value = "failed_reason", required
= true) String failedReason,
- @JsonProperty(value = "scanned_rows", required =
true) long scannedRows,
- @JsonProperty(value = "file_number", required =
true) long fileNumber,
- @JsonProperty(value = "file_size", required =
true) long fileSize,
- @JsonProperty(value = "normal_rows", required =
true) long normalRows,
- @JsonProperty(value = "abnormal_rows", required
= true) long abnormalRows,
- @JsonProperty(value = "unselect_rows", required
= true) long unselectRows,
- @JsonProperty("partial_abnormal_rows") String
partialAbnormalRows,
- @JsonProperty("scanned_bytes") long
scannedBytes) {
- this.isSuccess = isSuccess;
- this.failedReason = failedReason;
- this.scannedRows = scannedRows;
- this.fileNumber = fileNumber;
- this.fileSize = fileSize;
- this.normalRows = normalRows;
- this.abnormalRows = abnormalRows;
- this.unselectRows = unselectRows;
- this.partialAbnormalRows = partialAbnormalRows;
- this.scannedBytes = scannedBytes;
- }
+ // @JsonCreator
+ // public DppResult(@JsonProperty(value = "is_success", required = true)
boolean isSuccess,
+ // @JsonProperty(value = "failed_reason",
required = true) String failedReason,
+ // @JsonProperty(value = "scanned_rows",
required = true) long scannedRows,
+ // @JsonProperty(value = "file_number", required
= true) long fileNumber,
+ // @JsonProperty(value = "file_size", required =
true) long fileSize,
+ // @JsonProperty(value = "normal_rows", required
= true) long normalRows,
+ // @JsonProperty(value = "abnormal_rows",
required = true) long abnormalRows,
+ // @JsonProperty(value = "unselect_rows",
required = true) long unselectRows,
+ // @JsonProperty("partial_abnormal_rows") String
partialAbnormalRows,
+ // @JsonProperty("scanned_bytes") long
scannedBytes) {
+ // this.isSuccess = isSuccess;
+ // this.failedReason = failedReason;
+ // this.scannedRows = scannedRows;
+ // this.fileNumber = fileNumber;
+ // this.fileSize = fileSize;
+ // this.normalRows = normalRows;
+ // this.abnormalRows = abnormalRows;
+ // this.unselectRows = unselectRows;
+ // this.partialAbnormalRows = partialAbnormalRows;
+ // this.scannedBytes = scannedBytes;
+ // }
}
diff --git
a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
index f792087..5c7329f 100644
---
a/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
+++
b/spark-load/spark-load-core/src/main/java/org/apache/doris/SparkLoadRunner.java
@@ -19,6 +19,8 @@ package org.apache.doris;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.doris.common.CommandLineOptions;
+import org.apache.doris.common.Constants;
+import org.apache.doris.common.enums.StorageType;
import org.apache.doris.config.JobConfig;
import org.apache.doris.load.LoaderFactory;
import org.apache.doris.load.job.Loader;
@@ -40,6 +42,12 @@ import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
public class SparkLoadRunner {
@@ -66,6 +74,7 @@ public class SparkLoadRunner {
JobConfig jobConfig = readConfig(cmdOptions.getConfigPath());
try {
+ handleS3Config(jobConfig);
checkConfig(jobConfig);
} catch (IllegalArgumentException e) {
System.err.println("check config failed, msg: " +
ExceptionUtils.getStackTrace(e));
@@ -92,6 +101,7 @@ public class SparkLoadRunner {
} while (false);
loader.afterFinished();
+ // loader.cancel();
} catch (Exception e) {
loader.afterFailed(e);
@@ -146,4 +156,49 @@ public class SparkLoadRunner {
jobConfig.checkHadoopProperties();
}
+ private static void handleS3Config(JobConfig jobConfig) {
+ URI uri = URI.create(jobConfig.getWorkingDir());
+ if (uri.getScheme().equalsIgnoreCase("s3")) {
+
+ Map<String, String> hadoopProperties = new
HashMap<>(jobConfig.getHadoopProperties());
+
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ENDPOINT),
"s3.endpoint is empty");
+
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_REGION),
"s3.region is empty");
+
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_ACCESS_KEY),
"s3.access_key is empty");
+
Preconditions.checkArgument(hadoopProperties.containsKey(Constants.S3_SECRET_KEY),
"s3.secret_key is empty");
+
+ hadoopProperties.put("fs.s3a.endpoint",
hadoopProperties.get(Constants.S3_ENDPOINT));
+ hadoopProperties.remove(Constants.S3_ENDPOINT);
+ hadoopProperties.put("fs.s3a.endpoint.region",
hadoopProperties.get(Constants.S3_REGION));
+ hadoopProperties.remove(Constants.S3_REGION);
+ hadoopProperties.put("fs.s3a.access.key",
hadoopProperties.get(Constants.S3_ACCESS_KEY));
+ hadoopProperties.remove(Constants.S3_ACCESS_KEY);
+ hadoopProperties.put("fs.s3a.secret.key",
hadoopProperties.get(Constants.S3_SECRET_KEY));
+ hadoopProperties.remove(Constants.S3_SECRET_KEY);
+ if (hadoopProperties.containsKey(Constants.S3_TOKEN)) {
+ hadoopProperties.put("fs.s3a.session.token",
hadoopProperties.get(Constants.S3_TOKEN));
+ hadoopProperties.remove(Constants.S3_TOKEN);
+ hadoopProperties.put("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
+ } else {
+ hadoopProperties.put("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ }
+ hadoopProperties.put("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ jobConfig.setHadoopProperties(hadoopProperties);
+
+ // working dir path replace s3:// with s3a://
+ String resolvedWorkingDir = "s3a:" + uri.getSchemeSpecificPart();
+ jobConfig.setWorkingDir(resolvedWorkingDir);
+
+ // load task path replace s3:// with s3a://
+ for (String s : jobConfig.getLoadTasks().keySet()) {
+ JobConfig.TaskInfo taskInfo = jobConfig.getLoadTasks().get(s);
+ List<String> resolvedPaths = taskInfo.getPaths().stream()
+ .map(path -> "s3a:" +
URI.create(path).getSchemeSpecificPart())
+ .collect(Collectors.toList());
+ taskInfo.setPaths(resolvedPaths);
+ }
+ jobConfig.setStorageType(StorageType.S3);
+
+ }
+ }
+
}
diff --git
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
index a3e4803..56991ca 100644
---
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
+++
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
@@ -28,4 +28,10 @@ public interface Constants {
String DEFAULT_CATALOG = "internal";
+ String S3_ENDPOINT = "s3.endpoint";
+ String S3_REGION = "s3.region";
+ String S3_ACCESS_KEY = "s3.access_key";
+ String S3_SECRET_KEY = "s3.secret_key";
+ String S3_TOKEN = "s3.session_token";
+
}
diff --git
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
similarity index 63%
copy from
spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
copy to
spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
index a3e4803..761afb1 100644
---
a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/Constants.java
+++
b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/enums/StorageType.java
@@ -15,17 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common;
-
-public interface Constants {
-
- String HIVE_METASTORE_URIS = "hive.metastore.uris";
- String SPARK_STANDALONE_SCHEME = "spark";
- String HADOOP_AUTH_KERBEROS = "kerberos";
- String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
- String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
- String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
-
- String DEFAULT_CATALOG = "internal";
+package org.apache.doris.common.enums;
+public enum StorageType {
+ HDFS,S3;
}
diff --git
a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
index fb2f5cc..831e8ac 100644
---
a/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
+++
b/spark-load/spark-load-core/src/main/java/org/apache/doris/config/JobConfig.java
@@ -21,6 +21,7 @@ import org.apache.doris.SparkLoadRunner;
import org.apache.doris.client.DorisClient;
import org.apache.doris.common.Constants;
import org.apache.doris.common.enums.LoadMode;
+import org.apache.doris.common.enums.StorageType;
import org.apache.doris.common.enums.TaskType;
import org.apache.doris.exception.SparkLoadException;
@@ -73,6 +74,8 @@ public class JobConfig {
private Map<String, String> env = Collections.emptyMap();
+ private StorageType storageType = StorageType.HDFS;
+
@Data
public static class TaskInfo {
@@ -237,29 +240,31 @@ public class JobConfig {
if (hadoopProperties == null || hadoopProperties.isEmpty()) {
return;
}
- if (!hadoopProperties.containsKey("fs.defaultFS")) {
- throw new IllegalArgumentException("fs.defaultFS is empty");
- }
- // check auth
- if (hadoopProperties.containsKey("hadoop.security.authentication")
- &&
StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"),
"kerberos")) {
- if (hadoopProperties.containsKey("hadoop.kerberos.principal")) {
- if
(StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) {
- throw new IllegalArgumentException("hadoop kerberos
principal is empty");
- }
- if (hadoopProperties.containsKey("hadoop.kerberos.keytab")) {
- if
(!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) {
- throw new IllegalArgumentException("hadoop kerberos
keytab file is not exists, path: "
- +
hadoopProperties.get("hadoop.kerberos.keytab"));
+ if (!workingDir.startsWith("s3")) {
+ if (!hadoopProperties.containsKey("fs.defaultFS")) {
+ throw new IllegalArgumentException("fs.defaultFS is empty");
+ }
+ // check auth
+ if (hadoopProperties.containsKey("hadoop.security.authentication")
+ &&
StringUtils.equalsIgnoreCase(hadoopProperties.get("hadoop.security.authentication"),
"kerberos")) {
+ if (hadoopProperties.containsKey("hadoop.kerberos.principal"))
{
+ if
(StringUtils.isBlank(hadoopProperties.get("hadoop.kerberos.principal"))) {
+ throw new IllegalArgumentException("hadoop kerberos
principal is empty");
}
- return;
+ if
(hadoopProperties.containsKey("hadoop.kerberos.keytab")) {
+ if
(!FileUtils.getFile(hadoopProperties.get("hadoop.kerberos.keytab")).exists()) {
+ throw new IllegalArgumentException("hadoop
kerberos keytab file is not exists, path: "
+ +
hadoopProperties.get("hadoop.kerberos.keytab"));
+ }
+ return;
+ }
+ throw new IllegalArgumentException("hadoop.kerberos.keytab
is not set");
+ }
+ throw new IllegalArgumentException("hadoop.kerberos.principal
is not set");
+ } else {
+ if (!hadoopProperties.containsKey("hadoop.username")) {
+ throw new IllegalArgumentException("hadoop username is
empty");
}
- throw new IllegalArgumentException("hadoop.kerberos.keytab is
not set");
- }
- throw new IllegalArgumentException("hadoop.kerberos.principal is
not set");
- } else {
- if (!hadoopProperties.containsKey("hadoop.username")) {
- throw new IllegalArgumentException("hadoop username is empty");
}
}
}
diff --git
a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
index 80491bf..40459a1 100644
---
a/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
+++
b/spark-load/spark-load-core/src/main/java/org/apache/doris/load/job/PullLoader.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Constants;
import org.apache.doris.common.DppResult;
import org.apache.doris.common.LoadInfo;
import org.apache.doris.common.enums.JobStatus;
+import org.apache.doris.common.enums.StorageType;
import org.apache.doris.common.meta.LoadMeta;
import org.apache.doris.common.meta.TableMeta;
import org.apache.doris.config.EtlJobConfig;
@@ -39,6 +40,7 @@ import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
@@ -123,6 +125,7 @@ public class PullLoader extends Loader implements
Recoverable {
statusInfo.put("status", jobStatus.name());
statusInfo.put("msg", "");
statusInfo.put("appId", appHandle == null ? null :
appHandle.getAppId());
+ statusInfo.put("storageType", jobConfig.getStorageType().name());
try {
String dppResultStr = null;
int checkCnt = 0;
@@ -143,7 +146,7 @@ public class PullLoader extends Loader implements
Recoverable {
}
statusInfo.put("dppResult", dppResultStr);
statusInfo.put("filePathToSize",
JsonUtils.writeValueAsString(getFilePathToSize()));
- statusInfo.put("hadoopProperties",
JsonUtils.writeValueAsString(jobConfig.getHadoopProperties()));
+ statusInfo.put("hadoopProperties",
JsonUtils.writeValueAsString(getHadoopProperties()));
} catch (IOException e) {
throw new SparkLoadException("update job status failed", e);
}
@@ -169,6 +172,20 @@ public class PullLoader extends Loader implements
Recoverable {
} while (true);
}
+ private Map<String, String> getHadoopProperties() {
+ Map<String, String> hadoopProperties = new
HashMap<>(jobConfig.getHadoopProperties());
+ if (jobConfig.getStorageType() == StorageType.S3) {
+ hadoopProperties.put("AWS_ENDPOINT",
hadoopProperties.get("fs.s3a.endpoint"));
+ hadoopProperties.put("AWS_ACCESS_KEY",
hadoopProperties.get("fs.s3a.access.key"));
+ hadoopProperties.put("AWS_SECRET_KEY",
hadoopProperties.get("fs.s3a.secret.key"));
+ hadoopProperties.put("AWS_REGION",
hadoopProperties.get("fs.s3a.endpoint.region"));
+ if (hadoopProperties.containsKey("fs.s3a.session.token")) {
+ hadoopProperties.put("AWS_TOKEN",
hadoopProperties.get("fs.s3a.session.token"));
+ }
+ }
+ return hadoopProperties;
+ }
+
@Override
public void afterFailed(Exception e) {
if (loadMeta == null) {
@@ -359,7 +376,14 @@ public class PullLoader extends Loader implements
Recoverable {
if (fileStatus.isDirectory()) {
continue;
}
- filePathToSize.put(fileStatus.getPath().toString(),
fileStatus.getLen());
+ String filePath = fileStatus.getPath().toString();
+ if (jobConfig.getStorageType() == StorageType.S3) {
+ URI uri = fileStatus.getPath().toUri();
+ if (uri.getScheme() != null &&
uri.getScheme().startsWith("s3")) {
+ filePath = "s3:" + uri.getSchemeSpecificPart();
+ }
+ }
+ filePathToSize.put(filePath, fileStatus.getLen());
}
} catch (IOException e) {
throw new SparkLoadException("get dpp result failed", e);
diff --git a/spark-load/spark-load-dpp/pom.xml
b/spark-load/spark-load-dpp/pom.xml
index 5c99e96..7276c54 100644
--- a/spark-load/spark-load-dpp/pom.xml
+++ b/spark-load/spark-load-dpp/pom.xml
@@ -146,29 +146,7 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <!-- https://github.com/aws/aws-sdk-java/issues/1032 -->
- <exclusion>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
- </exclusion>
- </exclusions>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
diff --git
a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
index 0330001..068535e 100644
---
a/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
+++
b/spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -281,7 +281,7 @@ public class SparkEtlJob {
new SparkEtlJob(args[0]).run();
} catch (Exception e) {
System.err.println("spark etl job run failed");
- LOG.warn("", e);
+ LOG.error("spark etl job run failed", e);
System.exit(-1);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]