Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r814680116



##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same 
equality fields record will be emitted to
+ * same writer. That can prevent create duplicate record when insert and 
delete one row which have same equality field
+ * values on different writer in one transaction, and guarantee pos-delete 
will take effect.
+ */
+class EqualityFieldKeySelector extends BaseKeySelector<RowData, Integer> {
+
+  private final Schema schema;
+  private final Schema deleteSchema;
+
+  private transient StructProjection structProjection;
+  private transient StructLikeWrapper structLikeWrapper;
+
+  EqualityFieldKeySelector(List<Integer> equalityFieldIds, Schema schema, 
RowType flinkSchema) {
+    super(schema, flinkSchema);
+    this.schema = schema;

Review comment:
       OK,I will remove the `BaseKeySelector` until we got more other key 
selector and we need this.

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same 
equality fields record will be emitted to
+ * same writer. That can prevent create duplicate record when insert and 
delete one row which have same equality field

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same 
equality fields record will be emitted to
+ * same writer. That can prevent create duplicate record when insert and 
delete one row which have same equality field
+ * values on different writer in one transaction, and guarantee pos-delete 
will take effect.
+ */
+class EqualityFieldKeySelector extends BaseKeySelector<RowData, Integer> {
+
+  private final Schema schema;
+  private final Schema deleteSchema;
+
+  private transient StructProjection structProjection;
+  private transient StructLikeWrapper structLikeWrapper;
+
+  EqualityFieldKeySelector(List<Integer> equalityFieldIds, Schema schema, 
RowType flinkSchema) {
+    super(schema, flinkSchema);
+    this.schema = schema;
+    this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
+  }
+
+  /**
+   * Construct the {@link StructProjection} lazily because it is not 
serializable.
+   */
+  protected StructProjection lazyStructProjection() {
+    if (structProjection == null) {
+      structProjection = StructProjection.create(schema, deleteSchema);
+    }
+    return structProjection;
+  }
+
+  /**
+   * Construct the {@link StructLikeWrapper} lazily because it is not 
serializable.
+   */
+  protected StructLikeWrapper lazyStructLikeWrapper() {
+    if (structLikeWrapper == null) {
+      structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct());
+    }
+    return structLikeWrapper;
+  }
+
+  @Override
+  public Integer getKey(RowData row) {
+    return 
lazyStructLikeWrapper().set(lazyStructProjection().wrap(lazyRowDataWrapper().wrap(row))).hashCode();

Review comment:
       > Nit: It more clear for me to understand the unfolded code like the 
following
   
   Done. 
   
   > For the keyBy, we are essentially doing two layers of hashCode: 
equalityKeys.hashCode().hashCode(). wondering if it can cause any skewed 
distribution.
   
   I think we shoult not worry about the second `hashCode()`, because 
Interger's `hashCode` is the value itself.
   
   However, `keyBy` with equailty fields does have skewed data in some cases, 
but I think we can avoid the skewed data by setting the equailty fields, using 
the equailty fields as a partition, and then turning on the `HASH` distribution 
mode (as Ryan said, make sure that all of the partition source fields are 
identifier fields, we can distribute data by partition key).
   
   For example, if the equailty fields of a table is `id` and `type`, we can 
partition the `id`, such as `bucket(id, 16)`, and then turn on the `HASH 
distribution mode`, and the data will be distributed by the partition key  
`bucket(id, 16)`. And we avoid the skewed data.
   

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -146,13 +148,14 @@ private Builder forRowData(DataStream<RowData> 
newRowDataInput) {
                                             MapFunction<T, RowData> mapper,
                                             TypeInformation<RowData> 
outputType) {
       this.inputCreator = newUidPrefix -> {
+        // Input stream order is crucial for some situation(e.g. in cdc case). 
Therefore, we need to set the parallelism
+        // of map operator same as its input to keep map operator chaining its 
input, and avoid rebalanced by default.
+        SingleOutputStreamOperator<RowData> inputStream = input.map(mapper, 
outputType)

Review comment:
       Currently, many test cases of `TestFlinkIcebergSinkV2` are depend on 
this config. If we remove this, these test case would be faild. I think these 
test cases has cover this change.

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -295,15 +298,34 @@ public Builder setSnapshotProperty(String property, 
String value) {
         }
       }
 
+      // Find out the equality field id list based on the user-provided 
equality field column names.
+      List<Integer> equalityFieldIds = 
Lists.newArrayList(table.schema().identifierFieldIds());
+      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+        Set<Integer> equalityFieldSet = 
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = 
table.schema().findField(column);
+          Preconditions.checkNotNull(field, "Missing required equality field 
column '%s' in table schema %s",
+              column, table.schema());
+          equalityFieldSet.add(field.fieldId());
+        }
+
+        if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+          LOG.warn("The configured equality field columns are not match with 
the identifier fields of schema, " +

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -295,15 +298,34 @@ public Builder setSnapshotProperty(String property, 
String value) {
         }
       }
 
+      // Find out the equality field id list based on the user-provided 
equality field column names.
+      List<Integer> equalityFieldIds = 
Lists.newArrayList(table.schema().identifierFieldIds());
+      if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+        Set<Integer> equalityFieldSet = 
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = 
table.schema().findField(column);
+          Preconditions.checkNotNull(field, "Missing required equality field 
column '%s' in table schema %s",
+              column, table.schema());
+          equalityFieldSet.add(field.fieldId());
+        }
+
+        if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+          LOG.warn("The configured equality field columns are not match with 
the identifier fields of schema, " +
+              "use job specified equality field columns as the equality fields 
by default.");
+        }
+        equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+      }

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -422,21 +436,50 @@ private String operatorName(String suffix) {
         writeMode = distributionMode;
       }
 
+      LOG.info("Write distribution mode is '{}'", writeMode.modeName());
       switch (writeMode) {
         case NONE:
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            return input;
+          } else {
+            LOG.info("Distribute rows by equality fields, because there are 
equality fields set");
+            return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, 
iSchema, flinkRowType));
+          }
 
         case HASH:
-          if (partitionSpec.isUnpartitioned()) {
-            return input;
+          if (equalityFieldIds.isEmpty()) {
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.warn("Fallback to use 'none' distribution mode, because 
there are no equality fields set " +
+                  "and table is unpartitioned");
+              return input;
+            } else {
+              return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            }
           } else {
-            return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.info("Distribute rows by equality fields, because there are 
equality fields set " +
+                  "and table is unpartitioned");
+              return input.keyBy(new 
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+            } else {
+              for (PartitionField partitionField : partitionSpec.fields()) {
+                
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+                    "In 'hash' distribution mode with equality fields set, 
partition field '%s' " +
+                        "should be included in equality fields: '%s'", 
partitionField, equalityFieldColumns);
+              }
+              return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            }
           }
 
         case RANGE:
-          LOG.warn("Fallback to use 'none' distribution mode, because {}={} is 
not supported in flink now",
-              WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            LOG.warn("Fallback to use 'none' distribution mode, because there 
are no equality fields set " +
+                "and write.distribution-mode=range is not supported yet in 
flink");

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -422,21 +436,50 @@ private String operatorName(String suffix) {
         writeMode = distributionMode;
       }
 
+      LOG.info("Write distribution mode is '{}'", writeMode.modeName());
       switch (writeMode) {
         case NONE:
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            return input;
+          } else {
+            LOG.info("Distribute rows by equality fields, because there are 
equality fields set");
+            return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, 
iSchema, flinkRowType));
+          }
 
         case HASH:
-          if (partitionSpec.isUnpartitioned()) {
-            return input;
+          if (equalityFieldIds.isEmpty()) {
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.warn("Fallback to use 'none' distribution mode, because 
there are no equality fields set " +
+                  "and table is unpartitioned");
+              return input;
+            } else {
+              return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            }
           } else {
-            return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            if (partitionSpec.isUnpartitioned()) {
+              LOG.info("Distribute rows by equality fields, because there are 
equality fields set " +
+                  "and table is unpartitioned");
+              return input.keyBy(new 
EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType));
+            } else {
+              for (PartitionField partitionField : partitionSpec.fields()) {
+                
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+                    "In 'hash' distribution mode with equality fields set, 
partition field '%s' " +
+                        "should be included in equality fields: '%s'", 
partitionField, equalityFieldColumns);
+              }
+              return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+            }
           }
 
         case RANGE:
-          LOG.warn("Fallback to use 'none' distribution mode, because {}={} is 
not supported in flink now",
-              WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
-          return input;
+          if (equalityFieldIds.isEmpty()) {
+            LOG.warn("Fallback to use 'none' distribution mode, because there 
are no equality fields set " +
+                "and write.distribution-mode=range is not supported yet in 
flink");
+            return input;
+          } else {
+            LOG.info("Distribute rows by equality fields, because there are 
equality fields set " +
+                "and write.distribution-mode=range is not supported yet in 
flink");

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -84,36 +84,39 @@
   private final FileFormat format;
   private final int parallelism;
   private final boolean partitioned;
+  private final String distributionMode;
 
   private StreamExecutionEnvironment env;
   private TestTableLoader tableLoader;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, 
Partitioned={2}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, 
Partitioned={2}, Distribution={3}")

Review comment:
       done

##########
File path: 
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -155,10 +159,6 @@ private void testChangeLogs(List<String> 
equalityFieldColumns,
                               List<List<Record>> expectedRecordsPerCheckpoint) 
throws Exception {
     DataStream<Row> dataStream = env.addSource(new 
BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
 
-    // Shuffle by the equality key, so that different operations of the same 
key could be wrote in order when
-    // executing tasks in parallel.
-    dataStream = dataStream.keyBy(keySelector);

Review comment:
       Yes, because we ensure the input will not be disorder in 
`FlinkSink.forMapperOutputType` and data will be correctly  distributed by 
equaily fields or partition fields in different distribution mode. So we can 
remove this in unittest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to