This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 835c95fca0 NIFI-11204: Add configurable retry logic for table commits 
in PutIceberg processor
835c95fca0 is described below

commit 835c95fca0d4d4aff69096e9683f73a05dc4c196
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Tue Feb 21 15:11:19 2023 +0100

    NIFI-11204: Add configurable retry logic for table commits in PutIceberg 
processor
    
    This closes #6976.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../iceberg/AbstractIcebergProcessor.java          |   2 +-
 .../apache/nifi/processors/iceberg/PutIceberg.java |  68 ++++++-
 .../additionalDetails.html                         |  58 ++++++
 .../processors/iceberg/TestDataFileActions.java    | 195 +++++++++++++++++++++
 .../nifi/processors/iceberg/TestFileAbort.java     | 108 ------------
 5 files changed, 314 insertions(+), 117 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
index 7dc53eefd7..a4ec2ccf07 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
@@ -107,7 +107,7 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor {
 
             } catch (Exception e) {
                 getLogger().error("Privileged action failed with kerberos user 
" + kerberosUser, e);
-                session.transfer(flowFile, REL_FAILURE);
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
             }
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index cdd2997a58..1af97768f3 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -19,10 +19,12 @@ package org.apache.nifi.processors.iceberg;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PendingUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.util.Tasks;
@@ -54,6 +56,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
@@ -114,6 +117,42 @@ public class PutIceberg extends AbstractIcebergProcessor {
             .addValidator(StandardValidators.LONG_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor NUMBER_OF_COMMIT_RETRIES = new 
PropertyDescriptor.Builder()
+            .name("number-of-commit-retries")
+            .displayName("Number of Commit Retries")
+            .description("Number of times to retry a commit before failing.")
+            .required(true)
+            .defaultValue("10")
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MINIMUM_COMMIT_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name("minimum-commit-wait-time")
+            .displayName("Minimum Commit Wait Time")
+            .description("Minimum time to wait before retrying a commit.")
+            .required(true)
+            .defaultValue("100 ms")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAXIMUM_COMMIT_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name("maximum-commit-wait-time")
+            .displayName("Maximum Commit Wait Time")
+            .description("Maximum time to wait before retrying a commit.")
+            .required(true)
+            .defaultValue("2 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAXIMUM_COMMIT_DURATION = new 
PropertyDescriptor.Builder()
+            .name("maximum-commit-duration")
+            .displayName("Maximum Commit Duration")
+            .description("Total retry timeout period for a commit.")
+            .required(true)
+            .defaultValue("30 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("A FlowFile is routed to this relationship after the 
data ingestion was successful.")
@@ -131,7 +170,11 @@ public class PutIceberg extends AbstractIcebergProcessor {
             TABLE_NAME,
             FILE_FORMAT,
             MAXIMUM_FILE_SIZE,
-            KERBEROS_USER_SERVICE
+            KERBEROS_USER_SERVICE,
+            NUMBER_OF_COMMIT_RETRIES,
+            MINIMUM_COMMIT_WAIT_TIME,
+            MAXIMUM_COMMIT_WAIT_TIME,
+            MAXIMUM_COMMIT_DURATION
     ));
 
     public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
@@ -161,7 +204,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
             table = loadTable(context);
         } catch (Exception e) {
             getLogger().error("Failed to load table from catalog", e);
-            session.transfer(flowFile, REL_FAILURE);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
             return;
         }
 
@@ -182,7 +225,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
             }
 
             final WriteResult result = taskWriter.complete();
-            appendDataFiles(table, result);
+            appendDataFiles(context, table, result);
         } catch (Exception e) {
             getLogger().error("Exception occurred while writing iceberg 
records. Removing uncommitted data files", e);
             try {
@@ -193,7 +236,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
                 getLogger().error("Failed to abort uncommitted data files", 
ex);
             }
 
-            session.transfer(flowFile, REL_FAILURE);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
             return;
         }
 
@@ -223,14 +266,24 @@ public class PutIceberg extends AbstractIcebergProcessor {
     /**
      * Appends the pending data files to the given {@link Table}.
      *
+     * @param context processor context
      * @param table  table to append
      * @param result datafiles created by the {@link TaskWriter}
      */
-    private void appendDataFiles(Table table, WriteResult result) {
+    void appendDataFiles(ProcessContext context, Table table, WriteResult 
result) {
+        final int numberOfCommitRetries = 
context.getProperty(NUMBER_OF_COMMIT_RETRIES).evaluateAttributeExpressions().asInteger();
+        final long minimumCommitWaitTime = 
context.getProperty(MINIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumCommitWaitTime = 
context.getProperty(MAXIMUM_COMMIT_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumCommitDuration = 
context.getProperty(MAXIMUM_COMMIT_DURATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+
         final AppendFiles appender = table.newAppend();
         Arrays.stream(result.dataFiles()).forEach(appender::appendFile);
 
-        appender.commit();
+        Tasks.foreach(appender)
+                .exponentialBackoff(minimumCommitWaitTime, 
maximumCommitWaitTime, maximumCommitDuration, 2.0)
+                .retry(numberOfCommitRetries)
+                .onlyRetryOn(CommitFailedException.class)
+                .run(PendingUpdate::commit);
     }
 
     /**
@@ -253,8 +306,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
      */
     void abort(DataFile[] dataFiles, Table table) {
         Tasks.foreach(dataFiles)
-                .throwFailureWhenFinished()
-                .noRetry()
+                .retry(3)
                 .run(file -> table.io().deleteFile(file.path().toString()));
     }
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
new file mode 100644
index 0000000000..06844910f4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/resources/docs/org.apache.nifi.processors.iceberg.PutIceberg/additionalDetails.html
@@ -0,0 +1,58 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html";>
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+    <head>
+        <meta charset="utf-8"/>
+        <title>PutIceberg</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+    </head>
+
+    <body>
+
+        <h1>PutIceberg</h1>
+
+        <h3>Description</h3>
+        <p>
+            Iceberg is a high-performance format for huge analytic tables.
+            The PutIceberg processor is capable of pushing data into Iceberg 
tables using different types of Iceberg catalog implementations.
+        </p>
+
+        <h3>Commit retry properties</h3>
+        <p>
+            Iceberg supports multiple concurrent writes using optimistic 
concurrency.
+            The processor's commit retry implementation is using 
<b>exponential backoff</b> with <b>jitter</b> and <b>scale factor 2</b>, and 
provides the following properties to configure the behaviour according to its 
usage.
+
+        <ul>
+            <li>
+                Number Of Commit Retries (default: 10) - Number of retries 
that the processor is going to try to commit the new data files.
+            </li>
+            <li>
+                Minimum Commit Wait Time (default: 100 ms) - Minimum time that 
the processor is going to wait before each commit attempt.
+            </li>
+            <li>
+                Maximum Commit Wait Time (default: 2 sec) - Maximum time that 
the processor is going to wait before each commit attempt.
+            </li>
+            <li>
+                Maximum Commit Duration (default: 30 sec) - Maximum duration 
that the processor is going to wait before failing the current processor 
event's commit.
+            </li>
+        </ul>
+
+            The NiFi side retry logic is built on top of the Iceberg commit 
retry logic which can be configured through table properties. See more: <a 
href="https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties";>Table
 behavior properties</a>
+        </p>
+
+    </body>
+</html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
new file mode 100644
index 0000000000..dc001799f9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
+import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
+import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_DURATION;
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_WAIT_TIME;
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.MINIMUM_COMMIT_WAIT_TIME;
+import static 
org.apache.nifi.processors.iceberg.PutIceberg.NUMBER_OF_COMMIT_RETRIES;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.condition.OS.WINDOWS;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestDataFileActions {
+
+    private static final Namespace NAMESPACE = Namespace.of("default");
+    private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(NAMESPACE, "abort");
+
+    private static final Schema ABORT_SCHEMA = new Schema(
+            Types.NestedField.required(0, "id", Types.IntegerType.get())
+    );
+
+    private PutIceberg icebergProcessor;
+
+    @BeforeEach
+    public void setUp() {
+        icebergProcessor = new PutIceberg();
+    }
+
+    @DisabledOnOs(WINDOWS)
+    @Test
+    public void testAbortUncommittedFiles() throws IOException {
+        Table table = initCatalog();
+
+        List<RecordField> recordFields = Collections.singletonList(new 
RecordField("id", RecordFieldType.INT.getDataType()));
+        RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
+
+        List<MapRecord> recordList = new ArrayList<>();
+        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 1)));
+        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 2)));
+        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 3)));
+        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 4)));
+        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 5)));
+
+        IcebergTaskWriterFactory taskWriterFactory = new 
IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, 
null);
+        TaskWriter<Record> taskWriter = taskWriterFactory.create();
+
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
+
+        for (MapRecord record : recordList) {
+            taskWriter.write(recordConverter.convert(record));
+        }
+
+        DataFile[] dataFiles = taskWriter.dataFiles();
+
+        // DataFiles written by the taskWriter should exist
+        for (DataFile dataFile : dataFiles) {
+            
Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
+        }
+
+        icebergProcessor.abort(taskWriter.dataFiles(), table);
+
+        // DataFiles shouldn't exist after aborting them
+        for (DataFile dataFile : dataFiles) {
+            
Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
+        }
+    }
+
+    @Test
+    public void testAppenderCommitRetryExceeded() {
+        ProcessContext context = Mockito.mock(ProcessContext.class);
+        when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new 
MockPropertyValue("3", null));
+        when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("1 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("1 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new 
MockPropertyValue("1 min", null));
+
+        AppendFiles appender = Mockito.mock(AppendFiles.class);
+        doThrow(CommitFailedException.class).when(appender).commit();
+
+        Table table = Mockito.mock(Table.class);
+        when(table.newAppend()).thenReturn(appender);
+
+        // assert the commit action eventually fails after exceeding the 
number of retries
+        assertThrows(CommitFailedException.class, () -> 
icebergProcessor.appendDataFiles(context, table, 
WriteResult.builder().build()));
+
+        // verify the commit action was called the configured number of times
+        verify(appender, times(4)).commit();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAppenderCommitSucceeded() {
+        ProcessContext context = Mockito.mock(ProcessContext.class);
+        when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new 
MockPropertyValue("3", null));
+        when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("1 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("1 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new 
MockPropertyValue("1 min", null));
+
+        AppendFiles appender = Mockito.mock(AppendFiles.class);
+        // the commit action should throw exception 2 times before succeeding
+        doThrow(CommitFailedException.class, 
CommitFailedException.class).doNothing().when(appender).commit();
+
+        Table table = Mockito.mock(Table.class);
+        when(table.newAppend()).thenReturn(appender);
+
+        // the method call shouldn't throw exception since the configured 
number of retries is higher than the number of failed commit actions
+        icebergProcessor.appendDataFiles(context, table, 
WriteResult.builder().build());
+
+        // verify the proper number of commit action was called
+        verify(appender, times(3)).commit();
+    }
+
+    @Test
+    public void testMaxCommitDurationExceeded() {
+        ProcessContext context = Mockito.mock(ProcessContext.class);
+        when(context.getProperty(NUMBER_OF_COMMIT_RETRIES)).thenReturn(new 
MockPropertyValue("5", null));
+        when(context.getProperty(MINIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("2 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_WAIT_TIME)).thenReturn(new 
MockPropertyValue("2 ms", null));
+        when(context.getProperty(MAXIMUM_COMMIT_DURATION)).thenReturn(new 
MockPropertyValue("1 ms", null));
+
+        AppendFiles appender = Mockito.mock(AppendFiles.class);
+        doThrow(CommitFailedException.class).when(appender).commit();
+
+        Table table = Mockito.mock(Table.class);
+        when(table.newAppend()).thenReturn(appender);
+
+        // assert the commit action eventually fails after exceeding duration 
of maximum retries
+        assertThrows(CommitFailedException.class, () -> 
icebergProcessor.appendDataFiles(context, table, 
WriteResult.builder().build()));
+
+        // verify the commit action was called only 2 times instead of the 
configured 5
+        verify(appender, times(2)).commit();
+    }
+
+    private Table initCatalog() throws IOException {
+        TestHadoopCatalogService catalogService = new 
TestHadoopCatalogService();
+        Catalog catalog = catalogService.getCatalog();
+
+        return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, 
PartitionSpec.unpartitioned());
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
deleted file mode 100644
index 0b5403986b..0000000000
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestFileAbort.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.iceberg;
-
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.types.Types;
-import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
-import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
-import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.jupiter.api.condition.OS.WINDOWS;
-
-public class TestFileAbort {
-
-    private static final Namespace NAMESPACE = Namespace.of("default");
-    private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(NAMESPACE, "abort");
-
-    private static final Schema ABORT_SCHEMA = new Schema(
-            Types.NestedField.required(0, "id", Types.IntegerType.get())
-    );
-
-    @DisabledOnOs(WINDOWS)
-    @Test
-    public void abortUncommittedFiles() throws IOException {
-        Table table = initCatalog();
-
-        List<RecordField> recordFields = Collections.singletonList(new 
RecordField("id", RecordFieldType.INT.getDataType()));
-        RecordSchema abortSchema = new SimpleRecordSchema(recordFields);
-
-        List<MapRecord> recordList = new ArrayList<>();
-        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 1)));
-        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 2)));
-        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 3)));
-        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 4)));
-        recordList.add(new MapRecord(abortSchema, 
Collections.singletonMap("id", 5)));
-
-        IcebergTaskWriterFactory taskWriterFactory = new 
IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, 
null);
-        TaskWriter<Record> taskWriter = taskWriterFactory.create();
-
-        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
-
-        for (MapRecord record : recordList) {
-            taskWriter.write(recordConverter.convert(record));
-        }
-
-        DataFile[] dataFiles = taskWriter.dataFiles();
-
-        // DataFiles written by the taskWriter should exist
-        for (DataFile dataFile : dataFiles) {
-            
Assertions.assertTrue(Files.exists(Paths.get(dataFile.path().toString())));
-        }
-
-        PutIceberg icebergProcessor = new PutIceberg();
-        icebergProcessor.abort(taskWriter.dataFiles(), table);
-
-        // DataFiles shouldn't exist after aborting them
-        for (DataFile dataFile : dataFiles) {
-            
Assertions.assertFalse(Files.exists(Paths.get(dataFile.path().toString())));
-        }
-    }
-
-    private Table initCatalog() throws IOException {
-        TestHadoopCatalogService catalogService = new 
TestHadoopCatalogService();
-        Catalog catalog = catalogService.getCatalog();
-
-        return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, 
PartitionSpec.unpartitioned());
-    }
-}

Reply via email to