This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bf2cbc3 Flink: Rename FlinkTableOptions to more generic
FlinkConfigOptions
bf2cbc3 is described below
commit bf2cbc3d6cc4d8fbb261cc48de75592f1caa807a
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Jun 28 06:00:35 2021 -0700
Flink: Rename FlinkTableOptions to more generic FlinkConfigOptions
---
.../{FlinkTableOptions.java => FlinkConfigOptions.java} | 4 ++--
.../org/apache/iceberg/flink/source/FlinkSource.java | 17 +++++++++--------
.../java/org/apache/iceberg/flink/FlinkTestBase.java | 2 +-
.../apache/iceberg/flink/source/TestFlinkScanSql.java | 6 +++---
4 files changed, 15 insertions(+), 14 deletions(-)
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
similarity index 96%
rename from flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
rename to flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
index 145190c..067abe8 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
@@ -23,9 +23,9 @@ package org.apache.iceberg.flink;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-public class FlinkTableOptions {
+public class FlinkConfigOptions {
- private FlinkTableOptions() {
+ private FlinkConfigOptions() {
}
public static final ConfigOption<Boolean>
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 84507c4..a3263d2 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -36,8 +36,8 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.FlinkTableOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
@@ -48,11 +48,11 @@ public class FlinkSource {
}
/**
- * Initialize a {@link Builder} to read the data from iceberg table.
Equivalent to {@link TableScan}.
- * See more options in {@link ScanContext}.
+ * Initialize a {@link Builder} to read the data from iceberg table.
Equivalent to {@link TableScan}. See more options
+ * in {@link ScanContext}.
* <p>
- * The Source can be read static data in bounded mode. It can also
continuously check the arrival of new data and
- * read records incrementally.
+ * The Source can be read static data in bounded mode. It can also
continuously check the arrival of new data and read
+ * records incrementally.
* <ul>
* <li>Without startSnapshotId: Bounded</li>
* <li>With startSnapshotId and with endSnapshotId: Bounded</li>
@@ -222,10 +222,11 @@ public class FlinkSource {
int inferParallelism(FlinkInputFormat format, ScanContext context) {
int parallelism =
readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
- if
(readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
{
- int maxInferParallelism =
readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+ if
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
{
+ int maxInferParallelism = readableConfig.get(FlinkConfigOptions
+ .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkState(maxInferParallelism >= 1,
-
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + "
cannot be less than 1");
+
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + "
cannot be less than 1");
int splitNum;
try {
FlinkInputSplit[] splits = format.createInputSplits(0);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 2810326..1ee000c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -79,7 +79,7 @@ public abstract class FlinkTestBase extends TestBaseUtils {
.build();
TableEnvironment env = TableEnvironment.create(settings);
-
env.getConfig().getConfiguration().set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
+
env.getConfig().getConfiguration().set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
tEnv = env;
}
}
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index f99e4b6..9af1b7c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkTableOptions;
+import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -158,7 +158,7 @@ public class TestFlinkScanSql extends TestFlinkSource {
// 2 splits and max infer parallelism is 1 (max < splits num), the
parallelism is 1
Configuration configuration = new Configuration();
-
configuration.setInteger(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
1);
+
configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
1);
parallelism = FlinkSource.forRowData()
.flinkConf(configuration)
.inferParallelism(flinkInputFormat, ScanContext.builder().build());
@@ -171,7 +171,7 @@ public class TestFlinkScanSql extends TestFlinkSource {
Assert.assertEquals("Should produce the expected parallelism.", 1,
parallelism);
// 2 splits, infer parallelism is disabled, the parallelism is flink
default parallelism 1
-
configuration.setBoolean(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
+
configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
parallelism = FlinkSource.forRowData()
.flinkConf(configuration)
.inferParallelism(flinkInputFormat,
ScanContext.builder().limit(3).build());