This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f49b263e65 [Improve][Connector-V2] Remove hard code iceberg table
format version (#7500)
f49b263e65 is described below
commit f49b263e6517cee13a7c853a83472ce863f4b9d1
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 29 22:41:19 2024 +0800
[Improve][Connector-V2] Remove hard code iceberg table format version
(#7500)
---
docs/en/faq.md | 21 -----------
docs/zh/faq.md | 21 -----------
.../seatunnel/iceberg/IcebergCatalogLoader.java | 11 ++----
.../seatunnel/iceberg/catalog/IcebergCatalog.java | 37 +++++++++----------
.../seatunnel/iceberg/data/RowConverter.java | 22 ++++-------
.../seatunnel/iceberg/sink/IcebergSink.java | 6 +--
.../seatunnel/iceberg/sink/IcebergSinkWriter.java | 10 ++---
.../iceberg/sink/writer/IcebergRecordWriter.java | 12 +-----
.../iceberg/sink/writer/IcebergWriterFactory.java | 4 --
.../seatunnel/iceberg/source/IcebergSource.java | 10 ++---
.../seatunnel/iceberg/utils/SchemaUtils.java | 43 +++++-----------------
11 files changed, 51 insertions(+), 146 deletions(-)
diff --git a/docs/en/faq.md b/docs/en/faq.md
index 2e50c9d461..02c125ad4f 100644
--- a/docs/en/faq.md
+++ b/docs/en/faq.md
@@ -203,23 +203,6 @@ spark {
}
```
-## How do I specify a different JDK version for SeaTunnel on YARN?
-
-For example, if you want to set the JDK version to JDK8, there are two cases:
-
-- The YARN cluster has deployed JDK8, but the default JDK is not JDK8. Add two
configurations to the SeaTunnel config file:
-
- ```
- env {
- ...
- spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
- spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
- ...
- }
- ```
-- YARN cluster does not deploy JDK8. At this time, start SeaTunnel attached
with JDK8. For detailed operations, see:
- https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
-
## What should I do if OOM always appears when running SeaTunnel in Spark
local[*] mode?
If you run in local mode, you need to modify the `start-seatunnel.sh` startup
script. After `spark-submit`, add a parameter `--driver-memory 4g` . Under
normal circumstances, local mode is not used in the production environment.
Therefore, this parameter generally does not need to be set during On YARN.
See: [Application
Properties](https://spark.apache.org/docs/latest/configuration.html#application-properties)
for details.
@@ -334,10 +317,6 @@ spark-submit --verbose
...
```
-## How do I use SeaTunnel to synchronize data across HDFS clusters?
-
-Just configure hdfs-site.xml properly. Refer to:
https://www.cnblogs.com/suanec/p/7828139.html.
-
## I want to learn the source code of SeaTunnel. Where should I start?
SeaTunnel has a completely abstract and structured code implementation, and
many people have chosen SeaTunnel As a way to learn Spark. You can learn the
source code from the main program entry: SeaTunnel.java
diff --git a/docs/zh/faq.md b/docs/zh/faq.md
index 3be6ce38e5..4fc24e6a3a 100644
--- a/docs/zh/faq.md
+++ b/docs/zh/faq.md
@@ -204,23 +204,6 @@ spark {
}
```
-## 如何为 YARN 上的 SeaTunnel 指定不同的 JDK 版本?
-
-例如要设置JDK版本为JDK8,有两种情况:
-
-- YARN集群已部署JDK8,但默认JDK不是JDK8。 在 SeaTunnel 配置文件中添加两个配置:
-
- ```
- env {
- ...
- spark.executorEnv.JAVA_HOME="/your/java_8_home/directory"
- spark.yarn.appMasterEnv.JAVA_HOME="/your/java_8_home/directory"
- ...
- }
- ```
-- YARN集群未部署JDK8。 此时,启动附带JDK8的SeaTunnel。 详细操作参见:
- https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
-
## Spark local[*]模式运行SeaTunnel时总是出现OOM怎么办?
如果以本地模式运行,则需要修改`start-seatunnel.sh`启动脚本。 在 `spark-submit` 之后添加参数
`--driver-memory 4g` 。 一般情况下,生产环境中不使用本地模式。 因此,On YARN时一般不需要设置该参数。
有关详细信息,请参阅:[应用程序属性](https://spark.apache.org/docs/latest/configuration.html#application-properties)。
@@ -335,10 +318,6 @@ spark-submit --verbose
...
```
-## 如何使用SeaTunnel跨HDFS集群同步数据?
-
-只需正确配置 hdfs-site.xml 即可。 参考:https://www.cnblogs.com/suanec/p/7828139.html。
-
## 我想学习SeaTunnel的源代码。 我应该从哪里开始?
SeaTunnel 拥有完全抽象、结构化的代码实现,很多人都选择 SeaTunnel 作为学习 Spark 的方式。
您可以从主程序入口了解源代码:SeaTunnel.java
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
index 0f4610783a..bbb590502c 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
@@ -50,25 +50,20 @@ public class IcebergCatalogLoader implements Serializable {
private static final long serialVersionUID = -6003040601422350869L;
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("core-site.xml", "hdfs-site.xml",
"hive-site.xml");
- private CommonConfig config;
+ private final CommonConfig config;
public IcebergCatalogLoader(CommonConfig config) {
this.config = config;
}
public Catalog loadCatalog() {
- // When using the seatunel engine, set the current class loader to
prevent loading failures
+ // When using the SeaTunnel engine, set the current class loader to
prevent loading failures
Thread.currentThread().setContextClassLoader(IcebergCatalogLoader.class.getClassLoader());
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(),
loadHadoopConfig(config));
}
- /**
- * Loading Hadoop configuration through reflection
- *
- * @param config
- * @return
- */
+ /** Loading Hadoop configuration through reflection */
public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index 520f9bdbac..fc28001b2c 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -58,9 +58,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtil
@Slf4j
public class IcebergCatalog implements Catalog {
- private String catalogName;
- private ReadonlyConfig readonlyConfig;
- private IcebergCatalogLoader icebergCatalogLoader;
+ private final String catalogName;
+ private final ReadonlyConfig readonlyConfig;
+ private final IcebergCatalogLoader icebergCatalogLoader;
private org.apache.iceberg.catalog.Catalog catalog;
public IcebergCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
@@ -224,22 +224,21 @@ public class IcebergCatalog implements Catalog {
public CatalogTable toCatalogTable(Table icebergTable, TablePath
tablePath) {
List<Types.NestedField> columns = icebergTable.schema().columns();
TableSchema.Builder builder = TableSchema.builder();
- columns.stream()
- .forEach(
- nestedField -> {
- String name = nestedField.name();
- SeaTunnelDataType<?> seaTunnelType =
- SchemaUtils.toSeaTunnelType(name,
nestedField.type());
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- name,
- seaTunnelType,
- (Long) null,
- true,
- null,
- nestedField.doc());
- builder.column(physicalColumn);
- });
+ columns.forEach(
+ nestedField -> {
+ String name = nestedField.name();
+ SeaTunnelDataType<?> seaTunnelType =
+ SchemaUtils.toSeaTunnelType(name,
nestedField.type());
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+ name,
+ seaTunnelType,
+ (Long) null,
+ true,
+ null,
+ nestedField.doc());
+ builder.column(physicalColumn);
+ });
List<String> partitionKeys =
icebergTable.spec().fields().stream()
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
index 8c699b3440..f46928456f 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java
@@ -92,17 +92,17 @@ public class RowConverter {
return nameMappingString != null ?
NameMappingParser.fromJson(nameMappingString) : null;
}
- public Record convert(Object row, SeaTunnelDataType rowType) {
+ public Record convert(Object row, SeaTunnelDataType<?> rowType) {
return convertStructValue(row, rowType, tableSchema.asStruct(), -1,
null);
}
- public Record convert(Object row, SeaTunnelDataType rowType,
SchemaChangeWrapper wrapper) {
+ public Record convert(Object row, SeaTunnelDataType<?> rowType,
SchemaChangeWrapper wrapper) {
return convertStructValue(row, rowType, tableSchema.asStruct(), -1,
wrapper);
}
protected GenericRecord convertStructValue(
Object value,
- SeaTunnelDataType fromType,
+ SeaTunnelDataType<?> fromType,
Types.StructType schema,
int parentFieldId,
SchemaChangeWrapper wrapper) {
@@ -120,15 +120,7 @@ public class RowConverter {
}
}
- /**
- * Convert RowType
- *
- * @param row
- * @param fromType
- * @param schema
- * @param structFieldId
- * @return
- */
+ /** Convert RowType */
private GenericRecord convertToStruct(
SeaTunnelRow row,
SeaTunnelRowType fromType,
@@ -179,7 +171,7 @@ public class RowConverter {
public Object convertValue(
Object value,
- SeaTunnelDataType fromType,
+ SeaTunnelDataType<?> fromType,
Type type,
int fieldId,
SchemaChangeWrapper wrapper) {
@@ -252,7 +244,7 @@ public class RowConverter {
protected List<Object> convertListValue(
Object value,
- SeaTunnelDataType fromType,
+ SeaTunnelDataType<?> fromType,
Types.ListType type,
SchemaChangeWrapper wrapper) {
Preconditions.checkArgument(value.getClass().isArray());
@@ -269,7 +261,7 @@ public class RowConverter {
protected Map<Object, Object> convertMapValue(
Object value,
- SeaTunnelDataType fromType,
+ SeaTunnelDataType<?> fromType,
Types.MapType type,
SchemaChangeWrapper wrapper) {
Preconditions.checkArgument(value instanceof Map);
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 65bccbdb89..a1d43d6acf 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -61,9 +61,9 @@ public class IcebergSink
SupportSaveMode,
SupportMultiTableSink {
private static String PLUGIN_NAME = "Iceberg";
- private SinkConfig config;
- private ReadonlyConfig readonlyConfig;
- private CatalogTable catalogTable;
+ private final SinkConfig config;
+ private final ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
public IcebergSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable)
{
this.readonlyConfig = pluginConfig;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
index aed6522ca8..3a5e22b93b 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkWriter.java
@@ -54,13 +54,12 @@ public class IcebergSinkWriter
implements SinkWriter<SeaTunnelRow, IcebergCommitInfo,
IcebergSinkState>,
SupportMultiTableSinkWriter<Void> {
private SeaTunnelRowType rowType;
- private SinkConfig config;
- private IcebergTableLoader icebergTableLoader;
+ private final SinkConfig config;
+ private final IcebergTableLoader icebergTableLoader;
private RecordWriter writer;
- private IcebergFilesCommitter filesCommitter;
- private List<WriteResult> results = Lists.newArrayList();
+ private final IcebergFilesCommitter filesCommitter;
+ private final List<WriteResult> results = Lists.newArrayList();
private String commitUser = UUID.randomUUID().toString();
- private long checkpointId;
private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
@@ -77,7 +76,6 @@ public class IcebergSinkWriter
tryCreateRecordWriter();
if (Objects.nonNull(states) && !states.isEmpty()) {
this.commitUser = states.get(0).getCommitUser();
- this.checkpointId = states.get(0).getCheckpointId();
preCommit(states);
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 2be206ebb6..06b48591df 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -54,7 +54,7 @@ public class IcebergRecordWriter implements RecordWriter {
private final List<WriteResult> writerResults;
private TaskWriter<Record> writer;
private RowConverter recordConverter;
- private IcebergWriterFactory writerFactory;
+ private final IcebergWriterFactory writerFactory;
public IcebergRecordWriter(Table table, IcebergWriterFactory
writerFactory, SinkConfig config) {
this.config = config;
@@ -122,12 +122,7 @@ public class IcebergRecordWriter implements RecordWriter {
}
}
- /**
- * apply schema update
- *
- * @param updates
- * @return
- */
+ /** apply schema update */
private void applySchemaUpdate(SchemaChangeWrapper updates) {
// complete the current file
flush();
@@ -169,7 +164,4 @@ public class IcebergRecordWriter implements RecordWriter {
table.spec().partitionType()));
writer = null;
}
-
- @Override
- public void close() {}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
index 67809088ef..2ee7c3d6d7 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
@@ -40,9 +40,6 @@ import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@@ -58,7 +55,6 @@ import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
@Slf4j
public class IcebergWriterFactory {
- private static final Logger LOG =
LoggerFactory.getLogger(IcebergWriterFactory.class);
private final IcebergTableLoader tableLoader;
private final SinkConfig config;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
index 7a2fdf9d4f..c56f3f2f00 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSource.java
@@ -65,12 +65,12 @@ public class IcebergSource
private static final long serialVersionUID = 4343414808223919870L;
- private SourceConfig sourceConfig;
- private Schema tableSchema;
- private Schema projectedSchema;
- private SeaTunnelRowType seaTunnelRowType;
+ private final SourceConfig sourceConfig;
+ private final Schema tableSchema;
+ private final Schema projectedSchema;
+ private final SeaTunnelRowType seaTunnelRowType;
private JobContext jobContext;
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) {
this.sourceConfig = SourceConfig.loadConfig(config);
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 6c99eb409c..01343a119f 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -40,7 +40,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -106,21 +105,11 @@ public class SchemaUtils {
SinkConfig config = new SinkConfig(readonlyConfig);
// build auto create table
Map<String, String> options = new HashMap<>(table.getOptions());
- options.put(TableProperties.FORMAT_VERSION, "2");
// override
options.putAll(config.getAutoCreateProps());
return createTable(catalog, toIcebergTableIdentifier(tablePath),
config, schema, options);
}
- /**
- * For local test
- *
- * @param catalog
- * @param tableIdentifier
- * @param config
- * @param rowType
- * @return
- */
public static Table autoCreateTable(
Catalog catalog,
TableIdentifier tableIdentifier,
@@ -180,7 +169,7 @@ public class SchemaUtils {
Optional<Integer> pkId =
structType.fields().stream()
.filter(nestedField ->
nestedField.name().equals(pk))
- .map(nestedField -> nestedField.fieldId())
+ .map(Types.NestedField::fieldId)
.findFirst();
if (!pkId.isPresent()) {
throw new IllegalArgumentException(
@@ -196,23 +185,14 @@ public class SchemaUtils {
structType
.fields()
.forEach(
- field -> {
- fields.add(
-
identifierFieldIds.contains(field.fieldId())
- ? field.asRequired()
- : field.asOptional());
- });
+ field ->
+ fields.add(
+
identifierFieldIds.contains(field.fieldId())
+ ? field.asRequired()
+ : field.asOptional()));
return new Schema(fields, identifierFieldIds);
}
- public static TableIdentifier toIcebergTableIdentifierFromCatalogTable(
- CatalogTable catalogTable) {
- org.apache.seatunnel.api.table.catalog.TableIdentifier tableIdentifier
=
- catalogTable.getTableId();
- return TableIdentifier.of(
- tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName());
- }
-
public static TableIdentifier toIcebergTableIdentifier(TablePath
tablePath) {
return TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
}
@@ -221,12 +201,7 @@ public class SchemaUtils {
return TablePath.of(tableIdentifier.namespace().toString(),
tableIdentifier.name());
}
- /**
- * Commit table schema updates
- *
- * @param table
- * @param wrapper
- */
+ /** Commit table schema updates */
private static void commitSchemaUpdates(Table table, SchemaChangeWrapper
wrapper) {
// get the latest schema in case another process updated it
table.refresh();
@@ -249,7 +224,7 @@ public class SchemaUtils {
.collect(toList());
// Rename column name
- List<SchemaChangeColumn> changeColumns =
wrapper.changeColumns().stream().collect(toList());
+ List<SchemaChangeColumn> changeColumns = new
ArrayList<>(wrapper.changeColumns());
if (addColumns.isEmpty()
&& modifyColumns.isEmpty()
@@ -294,7 +269,7 @@ public class SchemaUtils {
return IcebergTypeMapper.mapping(fieldName, type);
}
- public static Type toIcebergType(SeaTunnelDataType rowType) {
+ public static Type toIcebergType(SeaTunnelDataType<?> rowType) {
return IcebergTypeMapper.toIcebergType(rowType);
}