This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new f4a1004d30 NIFI-12731: Ensure state is updated in GetHBase whenever the session is committed f4a1004d30 is described below commit f4a1004d30b7be6edcf26781b5c39cdb395c0f81 Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Thu Feb 1 19:24:34 2024 -0500 NIFI-12731: Ensure state is updated in GetHBase whenever the session is committed Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8347. --- .../main/java/org/apache/nifi/hbase/GetHBase.java | 93 +++++++++++----------- .../org/apache/nifi/hbase/scan/ResultHandler.java | 4 +- 2 files changed, 48 insertions(+), 49 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index d11fcd5042..8aaf7849c2 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -53,13 +53,13 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor .build(); private final AtomicReference<ScanResult> lastResult = new AtomicReference<>(); - private volatile List<Column> columns = new ArrayList<>(); + private final List<Column> columns = new ArrayList<>(); private volatile boolean justElectedPrimaryNode = false; private volatile String previousTable = null; @@ -226,11 +226,11 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor for (final String column : columns) { if (column.contains(":")) { final String[] parts = column.split(":"); - final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8")); - final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8")); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); + final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8); this.columns.add(new Column(cf, cq)); } else { - final byte[] cf = column.getBytes(Charset.forName("UTF-8")); + final byte[] cf = column.getBytes(StandardCharsets.UTF_8); this.columns.add(new Column(cf, null)); } } @@ -345,11 +345,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()); final String rowHash = new String(rowValue, StandardCharsets.UTF_8); - Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash); - if (cellHashes == null) { - cellHashes = new HashSet<>(); - cellsMatchingTimestamp.put(rowHash, cellHashes); - } + Set<String> cellHashes = cellsMatchingTimestamp.computeIfAbsent(rowHash, k -> new HashSet<>()); cellHashes.add(new String(cellValue, StandardCharsets.UTF_8)); } } @@ -374,40 +370,12 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor rowsPulledHolder.set(++rowsPulled); if (++rowsPulled % getBatchSize() == 0) { - session.commitAsync(); + updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp); } }); - final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp); - - final ScanResult latestResult = lastResult.get(); - if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) { - session.setState(scanResults.toFlatMap(), Scope.CLUSTER); - session.commitAsync(() -> updateScanResultsIfNewer(scanResults)); - } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) { - final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells()); - - // copy the results of result.getMatchingCells() to combinedResults. - // do a deep copy because the Set may be modified below. - for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) { - combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); - } - - // combined the results from 'lastResult' - for (final Map.Entry<String, Set<String>> entry : latestResult.getMatchingCells().entrySet()) { - final Set<String> existing = combinedResults.get(entry.getKey()); - if (existing == null) { - combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); - } else { - existing.addAll(entry.getValue()); - } - } - - final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults); - session.setState(scanResult.toFlatMap(), Scope.CLUSTER); + updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp); - session.commitAsync(() -> updateScanResultsIfNewer(scanResult)); - } } catch (final IOException e) { getLogger().error("Failed to receive data from HBase due to {}", e); session.rollback(); @@ -418,6 +386,39 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor } } + private void updateStateAndCommit(final ProcessSession session, final long latestTimestamp, final Map<String, Set<String>> cellsMatchingTimestamp) throws IOException { + final ScanResult scanResults = new ScanResult(latestTimestamp, cellsMatchingTimestamp); + + final ScanResult latestResult = lastResult.get(); + if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) { + session.setState(scanResults.toFlatMap(), Scope.CLUSTER); + session.commitAsync(() -> updateScanResultsIfNewer(scanResults)); + } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) { + final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells()); + + // copy the results of result.getMatchingCells() to combinedResults. + // do a deep copy because the Set may be modified below. + for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) { + combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); + } + + // combined the results from 'lastResult' + for (final Map.Entry<String, Set<String>> entry : latestResult.getMatchingCells().entrySet()) { + final Set<String> existing = combinedResults.get(entry.getKey()); + if (existing == null) { + combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); + } else { + existing.addAll(entry.getValue()); + } + } + + final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults); + session.setState(scanResult.toFlatMap(), Scope.CLUSTER); + + session.commitAsync(() -> updateScanResultsIfNewer(scanResult)); + } + } + private void updateScanResultsIfNewer(final ScanResult scanResult) { lastResult.getAndUpdate(current -> (current == null || scanResult.getTimestamp() > current.getTimestamp()) ? scanResult : current); } @@ -477,7 +478,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor if (scanResult == null || justElectedPrimaryNode) { if (client != null) { final Object obj = client.get(getKey(), stringSerDe, objectSerDe); - if (obj == null || !(obj instanceof ScanResult)) { + if (!(obj instanceof ScanResult)) { scanResult = null; } else { scanResult = (ScanResult) obj; @@ -493,11 +494,11 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor // we don't duplicate data. final File file = getStateFile(); if (file.exists()) { - try (final InputStream fis = new FileInputStream(file); + try (final InputStream fis = Files.newInputStream(file.toPath()); final ObjectInputStream ois = new ObjectInputStream(fis)) { final Object obj = ois.readObject(); - if (obj != null && (obj instanceof ScanResult)) { + if (obj instanceof ScanResult) { final ScanResult localScanResult = (ScanResult) obj; if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) { scanResult = localScanResult; @@ -606,11 +607,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor final String rowIndex = matcher.group(1); final String cellIndex = matcher.group(3); - Set<String> cellHashes = rowIndexToMatchingCellHashes.get(rowIndex); - if (cellHashes == null) { - cellHashes = new HashSet<>(); - rowIndexToMatchingCellHashes.put(rowIndex, cellHashes); - } + Set<String> cellHashes = rowIndexToMatchingCellHashes.computeIfAbsent(rowIndex, k -> new HashSet<>()); if (cellIndex == null) { // this provides a Row ID. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java index d0f1eab9ca..9d5e744d12 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.hbase.scan; +import java.io.IOException; + /** * Handles a single row from an HBase scan. */ public interface ResultHandler { - void handle(byte[] row, ResultCell[] resultCells); + void handle(byte[] row, ResultCell[] resultCells) throws IOException; }