This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f1c2cde14 [INLONG-2432][Manager] Supports encryption of passwords for
StreamSinks (#4410)
f1c2cde14 is described below
commit f1c2cde14e2c54a41167b68328e862d04eb5ff7f
Author: Lucas <[email protected]>
AuthorDate: Thu Jul 14 18:00:28 2022 +0800
[INLONG-2432][Manager] Supports encryption of passwords for StreamSinks
(#4410)
---
.../common/pojo/sink/ck/ClickHouseSinkDTO.java | 27 ++++++++++++++++--
.../common/pojo/sink/es/ElasticsearchSinkDTO.java | 33 ++++++++++++++++++++--
.../manager/common/pojo/sink/hive/HiveSinkDTO.java | 27 ++++++++++++++++--
.../pojo/sink/postgresql/PostgreSQLSinkDTO.java | 27 ++++++++++++++++--
.../src/test/resources/application.properties | 2 --
.../src/test/resources/application.properties | 2 --
6 files changed, 102 insertions(+), 16 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index a258f817c..a7e6c3f8e 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -24,10 +24,13 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -91,17 +94,26 @@ public class ClickHouseSinkDTO {
@ApiModelProperty("Table primary key")
private String primaryKey;
+ @ApiModelProperty("Password encrypt version")
+ private Integer encryptVersion;
+
@ApiModelProperty("Properties for clickhouse")
private Map<String, Object> properties;
/**
* Get the dto instance from the request
*/
- public static ClickHouseSinkDTO getFromRequest(ClickHouseSinkRequest
request) {
+ public static ClickHouseSinkDTO getFromRequest(ClickHouseSinkRequest
request) throws Exception {
+ Integer encryptVersion = AESUtils.getCurrentVersion(null);
+ String passwd = null;
+ if (StringUtils.isNotEmpty(request.getPassword())) {
+ passwd =
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+ encryptVersion);
+ }
return ClickHouseSinkDTO.builder()
.jdbcUrl(request.getJdbcUrl())
.username(request.getUsername())
- .password(request.getPassword())
+ .password(passwd)
.dbName(request.getDbName())
.tableName(request.getTableName())
.flushInterval(request.getFlushInterval())
@@ -115,6 +127,7 @@ public class ClickHouseSinkDTO {
.partitionBy(request.getPartitionBy())
.primaryKey(request.getPrimaryKey())
.orderBy(request.getOrderBy())
+ .encryptVersion(encryptVersion)
.properties(request.getProperties())
.build();
}
@@ -122,7 +135,7 @@ public class ClickHouseSinkDTO {
public static ClickHouseSinkDTO getFromJson(@NotNull String extParams) {
try {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- return OBJECT_MAPPER.readValue(extParams, ClickHouseSinkDTO.class);
+ return OBJECT_MAPPER.readValue(extParams,
ClickHouseSinkDTO.class).decryptPassword();
} catch (Exception e) {
throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
}
@@ -142,4 +155,12 @@ public class ClickHouseSinkDTO {
return tableInfo;
}
+ private ClickHouseSinkDTO decryptPassword() throws Exception {
+ if (StringUtils.isNotEmpty(this.password)) {
+ byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
+ this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
index 750e868a3..0efb32ce6 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -24,10 +24,14 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
/**
@@ -77,17 +81,26 @@ public class ElasticsearchSinkDTO {
@ApiModelProperty("version")
private Integer version;
+ @ApiModelProperty("Password encrypt version")
+ private Integer encryptVersion;
+
@ApiModelProperty("Properties for elasticsearch")
private Map<String, Object> properties;
/**
* Get the dto instance from the request
*/
- public static ElasticsearchSinkDTO getFromRequest(ElasticsearchSinkRequest
request) {
+ public static ElasticsearchSinkDTO getFromRequest(ElasticsearchSinkRequest
request) throws Exception {
+ Integer encryptVersion = AESUtils.getCurrentVersion(null);
+ String passwd = null;
+ if (StringUtils.isNotEmpty(request.getPassword())) {
+ passwd =
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+ encryptVersion);
+ }
return ElasticsearchSinkDTO.builder()
.host(request.getHost())
.username(request.getUsername())
- .password(request.getPassword())
+ .password(passwd)
.indexName(request.getIndexName())
.flushInterval(request.getFlushInterval())
.flushRecord(request.getFlushRecord())
@@ -95,6 +108,7 @@ public class ElasticsearchSinkDTO {
.documentType(request.getDocumentType())
.primaryKey(request.getPrimaryKey())
.version(request.getVersion())
+ .encryptVersion(encryptVersion)
.properties(request.getProperties())
.build();
}
@@ -105,10 +119,23 @@ public class ElasticsearchSinkDTO {
public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) {
try {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- return OBJECT_MAPPER.readValue(extParams,
ElasticsearchSinkDTO.class);
+ return OBJECT_MAPPER.readValue(extParams,
ElasticsearchSinkDTO.class).decryptPassword();
} catch (Exception e) {
throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
}
}
+ public static String getElasticSearchIndexName(ElasticsearchSinkDTO esInfo,
+ List<ElasticsearchFieldInfo> fieldList) {
+ return esInfo.getIndexName();
+ }
+
+ private ElasticsearchSinkDTO decryptPassword() throws Exception {
+ if (StringUtils.isNotEmpty(this.password)) {
+ byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
+ this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index eb3ccb608..d40a4f4cf 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -25,10 +25,13 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -85,17 +88,26 @@ public class HiveSinkDTO {
@ApiModelProperty("Version for Hive, such as: 3.2.1")
private String hiveVersion;
+ @ApiModelProperty("Password encrypt version")
+ private Integer encryptVersion;
+
@ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
private String hiveConfDir;
/**
* Get the dto instance from the request
*/
- public static HiveSinkDTO getFromRequest(HiveSinkRequest request) {
+ public static HiveSinkDTO getFromRequest(HiveSinkRequest request) throws
Exception {
+ Integer encryptVersion = AESUtils.getCurrentVersion(null);
+ String passwd = null;
+ if (StringUtils.isNotEmpty(request.getPassword())) {
+ passwd =
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+ encryptVersion);
+ }
return HiveSinkDTO.builder()
.jdbcUrl(request.getJdbcUrl())
.username(request.getUsername())
- .password(request.getPassword())
+ .password(passwd)
.dbName(request.getDbName())
.tableName(request.getTableName())
.dataPath(request.getDataPath())
@@ -107,6 +119,7 @@ public class HiveSinkDTO {
.dataSeparator(request.getDataSeparator())
.hiveVersion(request.getHiveVersion())
.hiveConfDir(request.getHiveConfDir())
+ .encryptVersion(encryptVersion)
.properties(request.getProperties())
.build();
}
@@ -117,7 +130,7 @@ public class HiveSinkDTO {
public static HiveSinkDTO getFromJson(@NotNull String extParams) {
try {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- return OBJECT_MAPPER.readValue(extParams, HiveSinkDTO.class);
+ return OBJECT_MAPPER.readValue(extParams,
HiveSinkDTO.class).decryptPassword();
} catch (Exception e) {
throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
}
@@ -152,4 +165,12 @@ public class HiveSinkDTO {
return tableInfo;
}
+ private HiveSinkDTO decryptPassword() throws Exception {
+ if (StringUtils.isNotEmpty(this.password)) {
+ byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
+ this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index a792ff728..f59e6ca42 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -24,10 +24,13 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -60,20 +63,30 @@ public class PostgreSQLSinkDTO {
@ApiModelProperty("Primary key")
private String primaryKey;
+ @ApiModelProperty("Password encrypt version")
+ private Integer encryptVersion;
+
@ApiModelProperty("Properties for PostgreSQL")
private Map<String, Object> properties;
/**
* Get the dto instance from the request
*/
- public static PostgreSQLSinkDTO getFromRequest(PostgreSQLSinkRequest
request) {
+ public static PostgreSQLSinkDTO getFromRequest(PostgreSQLSinkRequest
request) throws Exception {
+ Integer encryptVersion = AESUtils.getCurrentVersion(null);
+ String passwd = null;
+ if (StringUtils.isNotEmpty(request.getPassword())) {
+ passwd =
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+ encryptVersion);
+ }
return PostgreSQLSinkDTO.builder()
.jdbcUrl(request.getJdbcUrl())
.username(request.getUsername())
- .password(request.getPassword())
+ .password(passwd)
.dbName(request.getDbName())
.primaryKey(request.getPrimaryKey())
.tableName(request.getTableName())
+ .encryptVersion(encryptVersion)
.properties(request.getProperties())
.build();
}
@@ -87,7 +100,7 @@ public class PostgreSQLSinkDTO {
public static PostgreSQLSinkDTO getFromJson(@NotNull String extParams) {
try {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- return OBJECT_MAPPER.readValue(extParams, PostgreSQLSinkDTO.class);
+ return OBJECT_MAPPER.readValue(extParams,
PostgreSQLSinkDTO.class).decryptPassword();
} catch (Exception e) {
throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
}
@@ -105,4 +118,12 @@ public class PostgreSQLSinkDTO {
return tableInfo;
}
+ private PostgreSQLSinkDTO decryptPassword() throws Exception {
+ if (StringUtils.isNotEmpty(this.password)) {
+ byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
+ this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
}
diff --git
a/inlong-manager/manager-common/src/test/resources/application.properties
b/inlong-manager/manager-common/src/test/resources/application.properties
index 0481c7a54..1512ce7e8 100644
--- a/inlong-manager/manager-common/src/test/resources/application.properties
+++ b/inlong-manager/manager-common/src/test/resources/application.properties
@@ -17,8 +17,6 @@
# under the License.
#
-# Configure auth plugin
-#inlong.auth.type=default
# Encryption config, the suffix of value must be the same as the version.
inlong.encrypt.version=1
inlong.encrypt.key.value1="I!N@L#O$N%G^"
diff --git
a/inlong-manager/manager-common/src/test/resources/application.properties
b/inlong-manager/manager-service/src/test/resources/application.properties
similarity index 94%
copy from
inlong-manager/manager-common/src/test/resources/application.properties
copy to inlong-manager/manager-service/src/test/resources/application.properties
index 0481c7a54..1512ce7e8 100644
--- a/inlong-manager/manager-common/src/test/resources/application.properties
+++ b/inlong-manager/manager-service/src/test/resources/application.properties
@@ -17,8 +17,6 @@
# under the License.
#
-# Configure auth plugin
-#inlong.auth.type=default
# Encryption config, the suffix of value must be the same as the version.
inlong.encrypt.version=1
inlong.encrypt.key.value1="I!N@L#O$N%G^"