This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d9d8d8370 [avro] Writer throw more clear exception when NPE with 
aggregation merge function (#4547)
d9d8d8370 is described below

commit d9d8d837028c63666ccdec4d22677a8d869a3d77
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 19 15:29:18 2024 +0800

    [avro] Writer throw more clear exception when NPE with aggregation merge 
function (#4547)
---
 .../org/apache/paimon/io/SingleFileWriter.java     | 23 ++++++++++++++----
 .../paimon/flink/ContinuousFileStoreITCase.java    | 27 ++++++++++++++++++++++
 .../paimon/format/avro/AvroRowDatumWriter.java     | 11 ++++++++-
 3 files changed, 55 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index d41040e05..f303e8597 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -49,7 +49,7 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
     protected final Path path;
     private final Function<T, InternalRow> converter;
 
-    private final FormatWriter writer;
+    private FormatWriter writer;
     private PositionOutputStream out;
 
     private long recordCount;
@@ -144,7 +144,14 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
 
     @Override
     public void abort() {
-        IOUtils.closeQuietly(out);
+        if (writer != null) {
+            IOUtils.closeQuietly(writer);
+            writer = null;
+        }
+        if (out != null) {
+            IOUtils.closeQuietly(out);
+            out = null;
+        }
         fileIO.deleteQuietly(path);
     }
 
@@ -167,9 +174,15 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         }
 
         try {
-            writer.close();
-            out.flush();
-            out.close();
+            if (writer != null) {
+                writer.close();
+                writer = null;
+            }
+            if (out != null) {
+                out.flush();
+                out.close();
+                out = null;
+            }
         } catch (IOException e) {
             LOG.warn("Exception occurs when closing file {}. Cleaning up.", 
path, e);
             abort();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index cf97f7b67..2e1569751 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -28,6 +28,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -629,4 +631,29 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10", 
"11", "12"));
         iterator.close();
     }
+
+    @Test
+    public void testAvroRetractNotNullField() {
+        List<Row> input =
+                Arrays.asList(
+                        Row.ofKind(RowKind.INSERT, 1, "A"), 
Row.ofKind(RowKind.DELETE, 1, "A"));
+        String id = TestValuesTableFactory.registerData(input);
+        sEnv.executeSql(
+                String.format(
+                        "CREATE TEMPORARY TABLE source (pk INT PRIMARY KEY NOT 
ENFORCED, a STRING) "
+                                + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
+                                + "'changelog-mode' = 'I,D,UA,UB')",
+                        id));
+
+        sql(
+                "CREATE TABLE avro_sink (pk INT PRIMARY KEY NOT ENFORCED, a 
STRING NOT NULL) "
+                        + " WITH ('file.format' = 'avro', 'merge-engine' = 
'aggregation')");
+
+        assertThatThrownBy(
+                        () -> sEnv.executeSql("INSERT INTO avro_sink select * 
from source").await())
+                .satisfies(
+                        anyCauseMatches(
+                                RuntimeException.class,
+                                "Caught NullPointerException, the possible 
reason is you have set following options together"));
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
index c2bd81d00..d30245162 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -56,6 +56,15 @@ public class AvroRowDatumWriter implements 
DatumWriter<InternalRow> {
             // top Row is a UNION type
             out.writeIndex(1);
         }
-        this.writer.writeRow(datum, out);
+        try {
+            this.writer.writeRow(datum, out);
+        } catch (NullPointerException npe) {
+            throw new RuntimeException(
+                    "Caught NullPointerException, the possible reason is you 
have set following options together:\n"
+                            + "  1. file.format = avro;\n"
+                            + "  2. merge-function = 
aggregation/partial-update;\n"
+                            + "  3. some fields are not null.",
+                    npe);
+        }
     }
 }

Reply via email to