This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new f4a1004d30 NIFI-12731:  Ensure state is updated in GetHBase whenever 
the session is committed
f4a1004d30 is described below

commit f4a1004d30b7be6edcf26781b5c39cdb395c0f81
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Thu Feb 1 19:24:34 2024 -0500

    NIFI-12731:  Ensure state is updated in GetHBase whenever the session is 
committed
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8347.
---
 .../main/java/org/apache/nifi/hbase/GetHBase.java  | 93 +++++++++++-----------
 .../org/apache/nifi/hbase/scan/ResultHandler.java  |  4 +-
 2 files changed, 48 insertions(+), 49 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index d11fcd5042..8aaf7849c2 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -53,13 +53,13 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
             .build();
 
     private final AtomicReference<ScanResult> lastResult = new 
AtomicReference<>();
-    private volatile List<Column> columns = new ArrayList<>();
+    private final List<Column> columns = new ArrayList<>();
     private volatile boolean justElectedPrimaryNode = false;
     private volatile String previousTable = null;
 
@@ -226,11 +226,11 @@ public class GetHBase extends AbstractProcessor 
implements VisibilityFetchSuppor
         for (final String column : columns) {
             if (column.contains(":"))  {
                 final String[] parts = column.split(":");
-                final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8"));
-                final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8"));
+                final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
+                final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
                 this.columns.add(new Column(cf, cq));
             } else {
-                final byte[] cf = column.getBytes(Charset.forName("UTF-8"));
+                final byte[] cf = column.getBytes(StandardCharsets.UTF_8);
                 this.columns.add(new Column(cf, null));
             }
         }
@@ -345,11 +345,7 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
                             final byte[] cellValue = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength() + cell.getValueOffset());
 
                             final String rowHash = new String(rowValue, 
StandardCharsets.UTF_8);
-                            Set<String> cellHashes = 
cellsMatchingTimestamp.get(rowHash);
-                            if (cellHashes == null) {
-                                cellHashes = new HashSet<>();
-                                cellsMatchingTimestamp.put(rowHash, 
cellHashes);
-                            }
+                            Set<String> cellHashes = 
cellsMatchingTimestamp.computeIfAbsent(rowHash, k -> new HashSet<>());
                             cellHashes.add(new String(cellValue, 
StandardCharsets.UTF_8));
                         }
                     }
@@ -374,40 +370,12 @@ public class GetHBase extends AbstractProcessor 
implements VisibilityFetchSuppor
                 rowsPulledHolder.set(++rowsPulled);
 
                 if (++rowsPulled % getBatchSize() == 0) {
-                    session.commitAsync();
+                    updateStateAndCommit(session, latestTimestampHolder.get(), 
cellsMatchingTimestamp);
                 }
             });
 
-            final ScanResult scanResults = new 
ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
-
-            final ScanResult latestResult = lastResult.get();
-            if (latestResult == null || scanResults.getTimestamp() > 
latestResult.getTimestamp()) {
-                session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
-                session.commitAsync(() -> 
updateScanResultsIfNewer(scanResults));
-            } else if (scanResults.getTimestamp() == 
latestResult.getTimestamp()) {
-                final Map<String, Set<String>> combinedResults = new 
HashMap<>(scanResults.getMatchingCells());
-
-                // copy the results of result.getMatchingCells() to 
combinedResults.
-                // do a deep copy because the Set may be modified below.
-                for (final Map.Entry<String, Set<String>> entry : 
scanResults.getMatchingCells().entrySet()) {
-                    combinedResults.put(entry.getKey(), new 
HashSet<>(entry.getValue()));
-                }
-
-                // combined the results from 'lastResult'
-                for (final Map.Entry<String, Set<String>> entry : 
latestResult.getMatchingCells().entrySet()) {
-                    final Set<String> existing = 
combinedResults.get(entry.getKey());
-                    if (existing == null) {
-                        combinedResults.put(entry.getKey(), new 
HashSet<>(entry.getValue()));
-                    } else {
-                        existing.addAll(entry.getValue());
-                    }
-                }
-
-                final ScanResult scanResult = new 
ScanResult(scanResults.getTimestamp(), combinedResults);
-                session.setState(scanResult.toFlatMap(), Scope.CLUSTER);
+            updateStateAndCommit(session, latestTimestampHolder.get(), 
cellsMatchingTimestamp);
 
-                session.commitAsync(() -> 
updateScanResultsIfNewer(scanResult));
-            }
         } catch (final IOException e) {
             getLogger().error("Failed to receive data from HBase due to {}", 
e);
             session.rollback();
@@ -418,6 +386,39 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
         }
     }
 
+    private void updateStateAndCommit(final ProcessSession session, final long 
latestTimestamp, final Map<String, Set<String>> cellsMatchingTimestamp) throws 
IOException {
+        final ScanResult scanResults = new ScanResult(latestTimestamp, 
cellsMatchingTimestamp);
+
+        final ScanResult latestResult = lastResult.get();
+        if (latestResult == null || scanResults.getTimestamp() > 
latestResult.getTimestamp()) {
+            session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
+            session.commitAsync(() -> updateScanResultsIfNewer(scanResults));
+        } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) {
+            final Map<String, Set<String>> combinedResults = new 
HashMap<>(scanResults.getMatchingCells());
+
+            // copy the results of result.getMatchingCells() to 
combinedResults.
+            // do a deep copy because the Set may be modified below.
+            for (final Map.Entry<String, Set<String>> entry : 
scanResults.getMatchingCells().entrySet()) {
+                combinedResults.put(entry.getKey(), new 
HashSet<>(entry.getValue()));
+            }
+
+            // combined the results from 'lastResult'
+            for (final Map.Entry<String, Set<String>> entry : 
latestResult.getMatchingCells().entrySet()) {
+                final Set<String> existing = 
combinedResults.get(entry.getKey());
+                if (existing == null) {
+                    combinedResults.put(entry.getKey(), new 
HashSet<>(entry.getValue()));
+                } else {
+                    existing.addAll(entry.getValue());
+                }
+            }
+
+            final ScanResult scanResult = new 
ScanResult(scanResults.getTimestamp(), combinedResults);
+            session.setState(scanResult.toFlatMap(), Scope.CLUSTER);
+
+            session.commitAsync(() -> updateScanResultsIfNewer(scanResult));
+        }
+    }
+
     private void updateScanResultsIfNewer(final ScanResult scanResult) {
         lastResult.getAndUpdate(current -> (current == null || 
scanResult.getTimestamp() > current.getTimestamp()) ? scanResult : current);
     }
@@ -477,7 +478,7 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
         if (scanResult == null || justElectedPrimaryNode) {
             if (client != null) {
                 final Object obj = client.get(getKey(), stringSerDe, 
objectSerDe);
-                if (obj == null || !(obj instanceof ScanResult)) {
+                if (!(obj instanceof ScanResult)) {
                     scanResult = null;
                 } else {
                     scanResult = (ScanResult) obj;
@@ -493,11 +494,11 @@ public class GetHBase extends AbstractProcessor 
implements VisibilityFetchSuppor
         // we don't duplicate data.
         final File file = getStateFile();
         if (file.exists()) {
-            try (final InputStream fis = new FileInputStream(file);
+            try (final InputStream fis = Files.newInputStream(file.toPath());
                  final ObjectInputStream ois = new ObjectInputStream(fis)) {
 
                 final Object obj = ois.readObject();
-                if (obj != null && (obj instanceof ScanResult)) {
+                if (obj instanceof ScanResult) {
                     final ScanResult localScanResult = (ScanResult) obj;
                     if (scanResult == null || localScanResult.getTimestamp() > 
scanResult.getTimestamp()) {
                         scanResult = localScanResult;
@@ -606,11 +607,7 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
                 final String rowIndex = matcher.group(1);
                 final String cellIndex = matcher.group(3);
 
-                Set<String> cellHashes = 
rowIndexToMatchingCellHashes.get(rowIndex);
-                if (cellHashes == null) {
-                    cellHashes = new HashSet<>();
-                    rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
-                }
+                Set<String> cellHashes = 
rowIndexToMatchingCellHashes.computeIfAbsent(rowIndex, k -> new HashSet<>());
 
                 if (cellIndex == null) {
                     // this provides a Row ID.
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java
index d0f1eab9ca..9d5e744d12 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.hbase.scan;
 
+import java.io.IOException;
+
 /**
  * Handles a single row from an HBase scan.
  */
 public interface ResultHandler {
 
-    void handle(byte[] row, ResultCell[] resultCells);
+    void handle(byte[] row, ResultCell[] resultCells) throws IOException;
 
 }

Reply via email to