This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c2cde14 [INLONG-2432][Manager] Supports encryption of passwords for 
StreamSinks (#4410)
f1c2cde14 is described below

commit f1c2cde14e2c54a41167b68328e862d04eb5ff7f
Author: Lucas <[email protected]>
AuthorDate: Thu Jul 14 18:00:28 2022 +0800

    [INLONG-2432][Manager] Supports encryption of passwords for StreamSinks 
(#4410)
---
 .../common/pojo/sink/ck/ClickHouseSinkDTO.java     | 27 ++++++++++++++++--
 .../common/pojo/sink/es/ElasticsearchSinkDTO.java  | 33 ++++++++++++++++++++--
 .../manager/common/pojo/sink/hive/HiveSinkDTO.java | 27 ++++++++++++++++--
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java    | 27 ++++++++++++++++--
 .../src/test/resources/application.properties      |  2 --
 .../src/test/resources/application.properties      |  2 --
 6 files changed, 102 insertions(+), 16 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index a258f817c..a7e6c3f8e 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -24,10 +24,13 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
 
 import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
@@ -91,17 +94,26 @@ public class ClickHouseSinkDTO {
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
+    @ApiModelProperty("Password encrypt version")
+    private Integer encryptVersion;
+
     @ApiModelProperty("Properties for clickhouse")
     private Map<String, Object> properties;
 
     /**
      * Get the dto instance from the request
      */
-    public static ClickHouseSinkDTO getFromRequest(ClickHouseSinkRequest 
request) {
+    public static ClickHouseSinkDTO getFromRequest(ClickHouseSinkRequest 
request) throws Exception {
+        Integer encryptVersion = AESUtils.getCurrentVersion(null);
+        String passwd = null;
+        if (StringUtils.isNotEmpty(request.getPassword())) {
+            passwd = 
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+                    encryptVersion);
+        }
         return ClickHouseSinkDTO.builder()
                 .jdbcUrl(request.getJdbcUrl())
                 .username(request.getUsername())
-                .password(request.getPassword())
+                .password(passwd)
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
                 .flushInterval(request.getFlushInterval())
@@ -115,6 +127,7 @@ public class ClickHouseSinkDTO {
                 .partitionBy(request.getPartitionBy())
                 .primaryKey(request.getPrimaryKey())
                 .orderBy(request.getOrderBy())
+                .encryptVersion(encryptVersion)
                 .properties(request.getProperties())
                 .build();
     }
@@ -122,7 +135,7 @@ public class ClickHouseSinkDTO {
     public static ClickHouseSinkDTO getFromJson(@NotNull String extParams) {
         try {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            return OBJECT_MAPPER.readValue(extParams, ClickHouseSinkDTO.class);
+            return OBJECT_MAPPER.readValue(extParams, 
ClickHouseSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
         }
@@ -142,4 +155,12 @@ public class ClickHouseSinkDTO {
         return tableInfo;
     }
 
+    private ClickHouseSinkDTO decryptPassword() throws Exception {
+        if (StringUtils.isNotEmpty(this.password)) {
+            byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
+            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+        }
+        return this;
+    }
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
index 750e868a3..0efb32ce6 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -24,10 +24,14 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
 
 import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -77,17 +81,26 @@ public class ElasticsearchSinkDTO {
     @ApiModelProperty("version")
     private Integer version;
 
+    @ApiModelProperty("Password encrypt version")
+    private Integer encryptVersion;
+
     @ApiModelProperty("Properties for elasticsearch")
     private Map<String, Object> properties;
 
     /**
      * Get the dto instance from the request
      */
-    public static ElasticsearchSinkDTO getFromRequest(ElasticsearchSinkRequest 
request) {
+    public static ElasticsearchSinkDTO getFromRequest(ElasticsearchSinkRequest 
request) throws Exception {
+        Integer encryptVersion = AESUtils.getCurrentVersion(null);
+        String passwd = null;
+        if (StringUtils.isNotEmpty(request.getPassword())) {
+            passwd = 
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+                    encryptVersion);
+        }
         return ElasticsearchSinkDTO.builder()
                 .host(request.getHost())
                 .username(request.getUsername())
-                .password(request.getPassword())
+                .password(passwd)
                 .indexName(request.getIndexName())
                 .flushInterval(request.getFlushInterval())
                 .flushRecord(request.getFlushRecord())
@@ -95,6 +108,7 @@ public class ElasticsearchSinkDTO {
                 .documentType(request.getDocumentType())
                 .primaryKey(request.getPrimaryKey())
                 .version(request.getVersion())
+                .encryptVersion(encryptVersion)
                 .properties(request.getProperties())
                 .build();
     }
@@ -105,10 +119,23 @@ public class ElasticsearchSinkDTO {
     public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) {
         try {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            return OBJECT_MAPPER.readValue(extParams, 
ElasticsearchSinkDTO.class);
+            return OBJECT_MAPPER.readValue(extParams, 
ElasticsearchSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
         }
     }
 
+    public static String getElasticSearchIndexName(ElasticsearchSinkDTO esInfo,
+            List<ElasticsearchFieldInfo> fieldList) {
+        return esInfo.getIndexName();
+    }
+
+    private ElasticsearchSinkDTO decryptPassword() throws Exception {
+        if (StringUtils.isNotEmpty(this.password)) {
+            byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
+            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+        }
+        return this;
+    }
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index eb3ccb608..d40a4f4cf 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -25,10 +25,13 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
 
 import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
@@ -85,17 +88,26 @@ public class HiveSinkDTO {
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
 
+    @ApiModelProperty("Password encrypt version")
+    private Integer encryptVersion;
+
     @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
     private String hiveConfDir;
 
     /**
      * Get the dto instance from the request
      */
-    public static HiveSinkDTO getFromRequest(HiveSinkRequest request) {
+    public static HiveSinkDTO getFromRequest(HiveSinkRequest request) throws 
Exception {
+        Integer encryptVersion = AESUtils.getCurrentVersion(null);
+        String passwd = null;
+        if (StringUtils.isNotEmpty(request.getPassword())) {
+            passwd = 
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+                    encryptVersion);
+        }
         return HiveSinkDTO.builder()
                 .jdbcUrl(request.getJdbcUrl())
                 .username(request.getUsername())
-                .password(request.getPassword())
+                .password(passwd)
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
                 .dataPath(request.getDataPath())
@@ -107,6 +119,7 @@ public class HiveSinkDTO {
                 .dataSeparator(request.getDataSeparator())
                 .hiveVersion(request.getHiveVersion())
                 .hiveConfDir(request.getHiveConfDir())
+                .encryptVersion(encryptVersion)
                 .properties(request.getProperties())
                 .build();
     }
@@ -117,7 +130,7 @@ public class HiveSinkDTO {
     public static HiveSinkDTO getFromJson(@NotNull String extParams) {
         try {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            return OBJECT_MAPPER.readValue(extParams, HiveSinkDTO.class);
+            return OBJECT_MAPPER.readValue(extParams, 
HiveSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
         }
@@ -152,4 +165,12 @@ public class HiveSinkDTO {
         return tableInfo;
     }
 
+    private HiveSinkDTO decryptPassword() throws Exception {
+        if (StringUtils.isNotEmpty(this.password)) {
+            byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
+            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+        }
+        return this;
+    }
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index a792ff728..f59e6ca42 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -24,10 +24,13 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
 
 import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
@@ -60,20 +63,30 @@ public class PostgreSQLSinkDTO {
     @ApiModelProperty("Primary key")
     private String primaryKey;
 
+    @ApiModelProperty("Password encrypt version")
+    private Integer encryptVersion;
+
     @ApiModelProperty("Properties for PostgreSQL")
     private Map<String, Object> properties;
 
     /**
      * Get the dto instance from the request
      */
-    public static PostgreSQLSinkDTO getFromRequest(PostgreSQLSinkRequest 
request) {
+    public static PostgreSQLSinkDTO getFromRequest(PostgreSQLSinkRequest 
request) throws Exception {
+        Integer encryptVersion = AESUtils.getCurrentVersion(null);
+        String passwd = null;
+        if (StringUtils.isNotEmpty(request.getPassword())) {
+            passwd = 
AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+                    encryptVersion);
+        }
         return PostgreSQLSinkDTO.builder()
                 .jdbcUrl(request.getJdbcUrl())
                 .username(request.getUsername())
-                .password(request.getPassword())
+                .password(passwd)
                 .dbName(request.getDbName())
                 .primaryKey(request.getPrimaryKey())
                 .tableName(request.getTableName())
+                .encryptVersion(encryptVersion)
                 .properties(request.getProperties())
                 .build();
     }
@@ -87,7 +100,7 @@ public class PostgreSQLSinkDTO {
     public static PostgreSQLSinkDTO getFromJson(@NotNull String extParams) {
         try {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            return OBJECT_MAPPER.readValue(extParams, PostgreSQLSinkDTO.class);
+            return OBJECT_MAPPER.readValue(extParams, 
PostgreSQLSinkDTO.class).decryptPassword();
         } catch (Exception e) {
             throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
         }
@@ -105,4 +118,12 @@ public class PostgreSQLSinkDTO {
         return tableInfo;
     }
 
+    private PostgreSQLSinkDTO decryptPassword() throws Exception {
+        if (StringUtils.isNotEmpty(this.password)) {
+            byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
+            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+        }
+        return this;
+    }
+
 }
diff --git 
a/inlong-manager/manager-common/src/test/resources/application.properties 
b/inlong-manager/manager-common/src/test/resources/application.properties
index 0481c7a54..1512ce7e8 100644
--- a/inlong-manager/manager-common/src/test/resources/application.properties
+++ b/inlong-manager/manager-common/src/test/resources/application.properties
@@ -17,8 +17,6 @@
 # under the License.
 #
 
-# Configure auth plugin
-#inlong.auth.type=default
 # Encryption config, the suffix of value must be the same as the version.
 inlong.encrypt.version=1
 inlong.encrypt.key.value1="I!N@L#O$N%G^"
diff --git 
a/inlong-manager/manager-common/src/test/resources/application.properties 
b/inlong-manager/manager-service/src/test/resources/application.properties
similarity index 94%
copy from 
inlong-manager/manager-common/src/test/resources/application.properties
copy to inlong-manager/manager-service/src/test/resources/application.properties
index 0481c7a54..1512ce7e8 100644
--- a/inlong-manager/manager-common/src/test/resources/application.properties
+++ b/inlong-manager/manager-service/src/test/resources/application.properties
@@ -17,8 +17,6 @@
 # under the License.
 #
 
-# Configure auth plugin
-#inlong.auth.type=default
 # Encryption config, the suffix of value must be the same as the version.
 inlong.encrypt.version=1
 inlong.encrypt.key.value1="I!N@L#O$N%G^"

Reply via email to