This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f22d4ebd4d [Improve][Connector-v2] Support checkpoint in batch mode
for paimon sink (#8333)
f22d4ebd4d is described below
commit f22d4ebd4da87dfb5572122f12cfe02b8f2f0a8d
Author: dailai <[email protected]>
AuthorDate: Mon Dec 23 20:23:26 2024 +0800
[Improve][Connector-v2] Support checkpoint in batch mode for paimon sink
(#8333)
---
docs/en/connector-v2/sink/Paimon.md | 5 ++
docs/zh/connector-v2/sink/Paimon.md | 5 ++
.../seatunnel/paimon/sink/PaimonSink.java | 11 ++-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 41 +++-------
.../sink/commit/PaimonAggregatedCommitter.java | 51 +++---------
.../seatunnel/paimon/utils/JobContextUtil.java | 32 --------
.../e2e/connector/paimon/PaimonWithS3IT.java | 12 ++-
.../resources/fake_2_paimon_with_s3_to_assert.conf | 91 ++++++++++++++++++++++
.../fake_to_paimon_with_s3_with_checkpoint.conf | 63 +++++++++++++++
9 files changed, 203 insertions(+), 108 deletions(-)
diff --git a/docs/en/connector-v2/sink/Paimon.md
b/docs/en/connector-v2/sink/Paimon.md
index f2a68ae3b8..68aa63ad03 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -47,6 +47,11 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | No | -
| Properties in hadoop conf
|
| paimon.hadoop.conf-path | String | No | -
| The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
|
+## Checkpoint in batch mode
+
+When you set `checkpoint.interval` to a value greater than 0 in batch mode,
the paimon connector will commit the data to the paimon table when the
checkpoint triggers after a certain number of records have been written. At
this moment, the written data in paimon that is visible.
+However, if you do not set `checkpoint.interval` in batch mode, the paimon
sink connector will commit the data after all records are written. The written
data in paimon that is not visible until the batch task completes.
+
## Changelog
You must configure the `changelog-producer=input` option to enable the
changelog producer mode of the paimon table. If you use the auto-create table
function of paimon sink, you can configure this property in
`paimon.table.write-props`.
diff --git a/docs/zh/connector-v2/sink/Paimon.md
b/docs/zh/connector-v2/sink/Paimon.md
index 1faa5dc9b0..157c1fa5e8 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -46,6 +46,11 @@ libfb303-xxx.jar
| paimon.hadoop.conf | Map | 否 | - |
Hadoop配置文件属性信息
|
| paimon.hadoop.conf-path | 字符串 | 否 | - |
Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置
|
+## 批模式下的checkpoint
+
+当您在批处理模式下将`checkpoint.interval`设置为大于0的值时,在写入一定数量的记录后checkpoint触发时,paimon连接器将把数据提交到paimon表。此时,写入的数据是可见的。
+但是,如果您没有在批处理模式下设置`checkpoint.interval`,则在写入所有记录之后,paimon
sink连接器将提交数据。到批任务完成之前,写入的数据都是不可见的。
+
## 更新日志
你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon
sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index d657810c95..0129438c83 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -67,13 +67,13 @@ public class PaimonSink
private JobContext jobContext;
- private ReadonlyConfig readonlyConfig;
+ private final ReadonlyConfig readonlyConfig;
- private PaimonSinkConfig paimonSinkConfig;
+ private final PaimonSinkConfig paimonSinkConfig;
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
- private PaimonHadoopConfiguration paimonHadoopConfiguration;
+ private final PaimonHadoopConfiguration paimonHadoopConfiguration;
public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
this.readonlyConfig = readonlyConfig;
@@ -102,8 +102,7 @@ public class PaimonSink
@Override
public Optional<SinkAggregatedCommitter<PaimonCommitInfo,
PaimonAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
- return Optional.of(
- new PaimonAggregatedCommitter(paimonTable, jobContext,
paimonHadoopConfiguration));
+ return Optional.of(new PaimonAggregatedCommitter(paimonTable,
paimonHadoopConfiguration));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index b208a916bb..bc02cc4bf3 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -39,7 +39,6 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucket
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.handler.AlterPaimonTableSchemaEventHandler;
import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
-import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
import org.apache.paimon.CoreOptions;
@@ -49,8 +48,6 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -89,10 +86,6 @@ public class PaimonSinkWriter
private SeaTunnelRowType seaTunnelRowType;
- private final SinkWriter.Context context;
-
- private final JobContext jobContext;
-
private org.apache.seatunnel.api.table.catalog.TableSchema
sourceTableSchema;
private TableSchema sinkPaimonTableSchema;
@@ -133,8 +126,6 @@ public class PaimonSinkWriter
}
this.paimonSinkConfig = paimonSinkConfig;
this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
- this.context = context;
- this.jobContext = jobContext;
this.newTableWrite();
BucketMode bucketMode = this.paimonFileStoretable.bucketMode();
this.dynamicBucket =
@@ -147,8 +138,8 @@ public class PaimonSinkWriter
this.bucketAssigner =
new PaimonBucketAssigner(
paimonFileStoretable,
- this.context.getNumberOfParallelSubtasks(),
- this.context.getIndexOfSubtask());
+ context.getNumberOfParallelSubtasks(),
+ context.getIndexOfSubtask());
}
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
@@ -181,14 +172,13 @@ public class PaimonSinkWriter
.map(PaimonSinkState::getCommittables)
.flatMap(List::stream)
.collect(Collectors.toList());
- log.info("Trying to recommit states {}", commitables);
- if (JobContextUtil.isBatchJob(jobContext)) {
- log.debug("Trying to recommit states batch mode");
- ((BatchTableCommit) tableCommit).commit(commitables);
- } else {
- log.debug("Trying to recommit states streaming mode");
- ((StreamTableCommit) tableCommit).commit(checkpointId,
commitables);
+ // batch mode without checkpoint has no state to commit
+ if (commitables.isEmpty()) {
+ return;
}
+ // streaming mode or batch mode with checkpoint need to recommit
by stream api
+ log.info("Trying to recommit states {}", commitables);
+ ((StreamTableCommit) tableCommit).commit(checkpointId,
commitables);
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
@@ -238,10 +228,7 @@ public class PaimonSinkWriter
}
private void newTableWrite() {
- this.tableWriteBuilder =
- JobContextUtil.isBatchJob(jobContext)
- ? this.paimonFileStoretable.newBatchWriteBuilder()
- : this.paimonFileStoretable.newStreamWriteBuilder();
+ this.tableWriteBuilder =
this.paimonFileStoretable.newStreamWriteBuilder();
TableWrite oldTableWrite = this.tableWrite;
this.tableWrite =
tableWriteBuilder
@@ -260,14 +247,8 @@ public class PaimonSinkWriter
@Override
public Optional<PaimonCommitInfo> prepareCommit(long checkpointId) throws
IOException {
try {
- List<CommitMessage> fileCommittables;
- if (JobContextUtil.isBatchJob(jobContext)) {
- fileCommittables = ((BatchTableWrite)
tableWrite).prepareCommit();
- } else {
- fileCommittables =
- ((StreamTableWrite) tableWrite)
- .prepareCommit(waitCompaction(), checkpointId);
- }
+ List<CommitMessage> fileCommittables =
+ ((StreamTableWrite)
tableWrite).prepareCommit(waitCompaction(), checkpointId);
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables,
checkpointId));
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
index 8009135346..e452b6cf03 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java
@@ -17,17 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit;
-import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
-import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.TableCommit;
@@ -41,7 +38,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.stream.Collectors;
/** Paimon connector aggregated committer class */
@Slf4j
@@ -53,17 +49,9 @@ public class PaimonAggregatedCommitter
private final WriteBuilder tableWriteBuilder;
- private final JobContext jobContext;
-
public PaimonAggregatedCommitter(
- Table table,
- JobContext jobContext,
- PaimonHadoopConfiguration paimonHadoopConfiguration) {
- this.jobContext = jobContext;
- this.tableWriteBuilder =
- JobContextUtil.isBatchJob(jobContext)
- ? table.newBatchWriteBuilder()
- : table.newStreamWriteBuilder();
+ Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
+ this.tableWriteBuilder = table.newStreamWriteBuilder();
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}
@@ -73,31 +61,16 @@ public class PaimonAggregatedCommitter
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
PaimonSecurityContext.runSecured(
() -> {
- if (JobContextUtil.isBatchJob(jobContext)) {
- log.debug("Trying to commit states batch mode");
- List<CommitMessage> fileCommittables =
- aggregatedCommitInfo.stream()
- .flatMap(
- info ->
-
info.getCommittablesMap().values()
- .stream())
- .flatMap(List::stream)
- .collect(Collectors.toList());
- ((BatchTableCommit)
tableCommit).commit(fileCommittables);
- } else {
- log.debug("Trying to commit states streaming
mode");
- aggregatedCommitInfo.stream()
- .flatMap(
- paimonAggregatedCommitInfo ->
-
paimonAggregatedCommitInfo.getCommittablesMap()
-
.entrySet().stream())
- .forEach(
- entry ->
- ((StreamTableCommit)
tableCommit)
- .commit(
-
entry.getKey(),
-
entry.getValue()));
- }
+ log.debug("Trying to commit states streaming mode");
+ aggregatedCommitInfo.stream()
+ .flatMap(
+ paimonAggregatedCommitInfo ->
+
paimonAggregatedCommitInfo.getCommittablesMap()
+ .entrySet().stream())
+ .forEach(
+ entry ->
+ ((StreamTableCommit)
tableCommit)
+
.commit(entry.getKey(), entry.getValue()));
return null;
});
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
deleted file mode 100644
index 3a4d9b72d4..0000000000
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/JobContextUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.paimon.utils;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import lombok.extern.slf4j.Slf4j;
-
-/** Job env util */
-@Slf4j
-public class JobContextUtil {
-
- public static boolean isBatchJob(JobContext jobContext) {
- return jobContext.getJobMode().equals(JobMode.BATCH);
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
index a618aad8b3..a939955cc0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
@@ -122,11 +122,21 @@ public class PaimonWithS3IT extends SeaTunnelContainer {
}
@Test
- public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception {
+ public void testFakeCDCSinkPaimonWithS3Filesystem() throws Exception {
Container.ExecResult execResult =
executeJob("/fake_to_paimon_with_s3.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Container.ExecResult readResult =
executeJob("/paimon_with_s3_to_assert.conf");
Assertions.assertEquals(0, readResult.getExitCode());
}
+
+ @Test
+ public void
testFakeCDCSinkPaimonWithCheckpointInBatchModeWithS3Filesystem() throws
Exception {
+ Container.ExecResult execResult =
+ executeJob("/fake_to_paimon_with_s3_with_checkpoint.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Container.ExecResult readResult =
executeJob("/fake_2_paimon_with_s3_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
new file mode 100644
index 0000000000..e046f100e0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_2_paimon_with_s3_to_assert.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace12"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=minio
+ fs.s3a.secret-key=miniominio
+ fs.s3a.endpoint="http://minio:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5000
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 100000
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
new file mode 100644
index 0000000000..dc2585abc9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+}
+
+source {
+ FakeSource {
+ row.num = 5000
+ split.num = 10
+ split.read-interval = 1000
+ bigint.min = 1
+ bigint.max = 100000
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "s3a://test/"
+ database = "seatunnel_namespace12"
+ table = "st_test"
+ paimon.hadoop.conf = {
+ fs.s3a.access-key=minio
+ fs.s3a.secret-key=miniominio
+ fs.s3a.endpoint="http://minio:9000"
+ fs.s3a.path.style.access=true
+
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ }
+ }
+}