This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 68228921a4 NIFI-12664 Removed deprecated DMC in GetHBase
68228921a4 is described below

commit 68228921a43c506e92a0476f550951e164dfb90e
Author: Pierre Villard <pierre.villard...@gmail.com>
AuthorDate: Thu Jan 25 09:22:41 2024 +0400

    NIFI-12664 Removed deprecated DMC in GetHBase
    
    This closes #8301
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../main/java/org/apache/nifi/hbase/GetHBase.java  | 125 ++------------------
 .../java/org/apache/nifi/hbase/TestGetHBase.java   | 128 +--------------------
 2 files changed, 8 insertions(+), 245 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 5777a67861..8b2cf0d973 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -26,25 +26,20 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hbase.io.JsonRowSerializer;
 import org.apache.nifi.hbase.io.RowSerializer;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.hbase.util.ObjectSerDe;
-import org.apache.nifi.hbase.util.StringSerDe;
+import org.apache.nifi.migration.PropertyConfiguration;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -52,11 +47,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -103,13 +94,6 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
             .required(true)
             .identifiesControllerService(HBaseClientService.class)
             .build();
-    static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("Distributed Cache Service")
-            .description("Specifies the Controller Service that should be used 
to maintain state about what has been pulled from HBase" +
-                    " so that if a new node begins pulling data, it won't 
duplicate all of the work that has been done.")
-            .required(false)
-            .identifiesControllerService(DistributedMapCacheClient.class)
-            .build();
     static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
             .name("Character Set")
             .description("Specifies which character set is used to encode the 
data in HBase")
@@ -157,7 +141,6 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
 
     private final AtomicReference<ScanResult> lastResult = new 
AtomicReference<>();
     private volatile List<Column> columns = new ArrayList<>();
-    private volatile boolean justElectedPrimaryNode = false;
     private volatile String previousTable = null;
 
     @Override
@@ -169,7 +152,6 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(HBASE_CLIENT_SERVICE);
-        properties.add(DISTRIBUTED_CACHE_SERVICE);
         properties.add(TABLE_NAME);
         properties.add(COLUMNS);
         properties.add(AUTHORIZATIONS);
@@ -204,21 +186,14 @@ public class GetHBase extends AbstractProcessor 
implements VisibilityFetchSuppor
         }
     }
 
+    @Override
+    public void migrateProperties(PropertyConfiguration config) {
+        super.migrateProperties(config);
+        config.removeProperty("Distributed Cache Service");
+    }
+
     @OnScheduled
     public void parseColumns(final ProcessContext context) throws IOException {
-        final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
-        if (!stateMap.getStateVersion().isPresent()) {
-            // no state has been stored in the State Manager - check if we 
have state stored in the
-            // DistributedMapCacheClient service and migrate it if so
-            final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-            final ScanResult scanResult = getState(client);
-            if (scanResult != null) {
-                context.getStateManager().setState(scanResult.toFlatMap(), 
Scope.CLUSTER);
-            }
-
-            clearState(client);
-        }
-
         final String columnsValue = 
context.getProperty(COLUMNS).evaluateAttributeExpressions().getValue();
         final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
 
@@ -236,19 +211,6 @@ public class GetHBase extends AbstractProcessor implements 
VisibilityFetchSuppor
         }
     }
 
-    @OnPrimaryNodeStateChange
-    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
-    }
-
-    @OnRemoved
-    public void onRemoved(final ProcessContext context) {
-        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-        if (client != null) {
-            clearState(client);
-        }
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
@@ -427,91 +389,18 @@ public class GetHBase extends AbstractProcessor 
implements VisibilityFetchSuppor
         return 500;
     }
 
-    protected File getStateDir() {
-        return new File("conf/state");
-    }
-
-    protected File getStateFile() {
-        return new File(getStateDir(), "getHBase-" + getIdentifier());
-    }
-
-    protected String getKey() {
-        return "getHBase-" + getIdentifier() + "-state";
-    }
-
     protected List<Column> getColumns() {
         return columns;
     }
 
-    private void clearState(final DistributedMapCacheClient client) {
-        final File localState = getStateFile();
-        if (localState.exists()) {
-            localState.delete();
-        }
-
-        if (client != null) {
-            try {
-                client.remove(getKey(), new StringSerDe());
-            } catch (IOException e) {
-                getLogger().warn("Processor state was not cleared from 
distributed cache due to {}", new Object[]{e});
-            }
-        }
-    }
-
-
     private ScanResult getState(final ProcessSession session) throws 
IOException {
         final StateMap stateMap = session.getState(Scope.CLUSTER);
         if (!stateMap.getStateVersion().isPresent()) {
             return null;
         }
-
         return ScanResult.fromFlatMap(stateMap.toMap());
     }
 
-    private ScanResult getState(final DistributedMapCacheClient client) throws 
IOException {
-        final StringSerDe stringSerDe = new StringSerDe();
-        final ObjectSerDe objectSerDe = new ObjectSerDe();
-
-        ScanResult scanResult = lastResult.get();
-        // if we have no previous result, or we just became primary, pull from 
distributed cache
-        if (scanResult == null || justElectedPrimaryNode) {
-            if (client != null) {
-                final Object obj = client.get(getKey(), stringSerDe, 
objectSerDe);
-                if (obj == null || !(obj instanceof ScanResult)) {
-                    scanResult = null;
-                } else {
-                    scanResult = (ScanResult) obj;
-                    getLogger().debug("Retrieved state from the distributed 
cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()});
-                }
-            }
-
-            // no requirement to pull an update from the distributed cache 
anymore.
-            justElectedPrimaryNode = false;
-        }
-
-        // Check the persistence file. We want to use the latest timestamp 
that we have so that
-        // we don't duplicate data.
-        final File file = getStateFile();
-        if (file.exists()) {
-            try (final InputStream fis = new FileInputStream(file);
-                 final ObjectInputStream ois = new ObjectInputStream(fis)) {
-
-                final Object obj = ois.readObject();
-                if (obj != null && (obj instanceof ScanResult)) {
-                    final ScanResult localScanResult = (ScanResult) obj;
-                    if (scanResult == null || localScanResult.getTimestamp() > 
scanResult.getTimestamp()) {
-                        scanResult = localScanResult;
-                        getLogger().debug("Using last timestamp from local 
state because it was newer than the distributed cache, or no value existed in 
the cache");
-                    }
-                }
-            } catch (final IOException | ClassNotFoundException ioe) {
-                getLogger().warn("Failed to recover persisted state from {} 
due to {}. Assuming that state from distributed cache is correct.", new 
Object[]{file, ioe});
-            }
-        }
-
-        return scanResult;
-    }
-
     public static class ScanResult implements Serializable {
 
         private static final long serialVersionUID = 1L;
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 5245d142d7..6ca69c1cd7 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -16,23 +16,18 @@
  */
 package org.apache.nifi.hbase;
 
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.hbase.GetHBase.ScanResult;
 import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.util.StringSerDe;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
@@ -47,11 +42,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestGetHBase {
 
@@ -74,22 +67,12 @@ public class TestGetHBase {
         runner.enableControllerService(hBaseClient);
 
         runner.setProperty(GetHBase.TABLE_NAME, "nifi");
-        runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
         runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
         runner.setProperty(GetHBase.AUTHORIZATIONS, "");
 
         runner.setValidateExpressionUsage(true);
     }
 
-    @AfterEach
-    public void cleanup() {
-        final File file = proc.getStateFile();
-        if (file.exists()) {
-            file.delete();
-        }
-        assertFalse(file.exists());
-    }
-
     @Test
     public void testColumnsValidation() {
         runner.assertValid();
@@ -139,12 +122,6 @@ public class TestGetHBase {
 
     @Test
     public void testPersistAndRecoverFromLocalState() {
-        final File stateFile = new File("target/test-recover-state.bin");
-        if (!stateFile.delete() && stateFile.exists()) {
-            fail("Could not delete state file " + stateFile);
-        }
-        proc.setStateFile(stateFile);
-
         final long now = System.currentTimeMillis();
 
         final Map<String, String> cells = new HashMap<>();
@@ -164,7 +141,7 @@ public class TestGetHBase {
         runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
         runner.clearTransferState();
 
-        proc = new MockGetHBase(stateFile);
+        proc = new MockGetHBase();
 
         hBaseClient.addResult("row0", cells, now - 2);
         hBaseClient.addResult("row1", cells, now - 1);
@@ -195,14 +172,6 @@ public class TestGetHBase {
         runner.run();
         runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
 
-        // delete the processor's local state to simulate becoming the primary 
node
-        // for the first time, should use the state from distributed cache
-        final File stateFile = proc.getStateFile();
-        if (!stateFile.delete() && stateFile.exists()) {
-            fail("Could not delete state file " + stateFile);
-        }
-        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
         hBaseClient.addResult("row0", cells, now - 2);
         hBaseClient.addResult("row1", cells, now - 1);
         hBaseClient.addResult("row2", cells, now - 1);
@@ -214,76 +183,6 @@ public class TestGetHBase {
         runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
     }
 
-    @Test
-    public void testBecomePrimaryWithNewerLocalState() throws 
InitializationException {
-        final long now = System.currentTimeMillis();
-
-        final Map<String, String> cells = new HashMap<>();
-        cells.put("greeting", "hello");
-        cells.put("name", "nifi");
-
-        hBaseClient.addResult("row0", cells, now - 2);
-        hBaseClient.addResult("row1", cells, now - 1);
-        hBaseClient.addResult("row2", cells, now - 1);
-        hBaseClient.addResult("row3", cells, now);
-
-        runner.run(100);
-        runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
-
-        // trick for testing so that row4 gets written to local state but not 
to the real cache
-        final MockCacheClient otherCacheClient = new MockCacheClient();
-        runner.addControllerService("otherCacheClient", otherCacheClient);
-        runner.enableControllerService(otherCacheClient);
-        runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, 
"otherCacheClient");
-
-        hBaseClient.addResult("row4", cells, now + 1);
-        runner.run();
-        runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
-
-        // set back the original cache cacheClient which is missing row4
-        runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
-
-        // become the primary node, but we have existing local state with rows 
0-4
-        // so we shouldn't get any output because we should use the local state
-        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
-        hBaseClient.addResult("row0", cells, now - 2);
-        hBaseClient.addResult("row1", cells, now - 1);
-        hBaseClient.addResult("row2", cells, now - 1);
-        hBaseClient.addResult("row3", cells, now);
-        hBaseClient.addResult("row4", cells, now + 1);
-
-        runner.clearTransferState();
-        runner.run(100);
-        runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
-    }
-
-    @Test
-    public void testOnRemovedClearsState() throws IOException {
-        final long now = System.currentTimeMillis();
-
-        final Map<String, String> cells = new HashMap<>();
-        cells.put("greeting", "hello");
-        cells.put("name", "nifi");
-
-        hBaseClient.addResult("row0", cells, now - 2);
-        hBaseClient.addResult("row1", cells, now - 1);
-        hBaseClient.addResult("row2", cells, now - 1);
-        hBaseClient.addResult("row3", cells, now);
-
-        runner.run(100);
-        runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
-
-        // should have a local state file and a cache entry before removing
-        runner.getStateManager().assertStateSet(Scope.CLUSTER);
-
-        proc.onRemoved(runner.getProcessContext());
-
-        // onRemoved should have cleared both
-        assertFalse(proc.getStateFile().exists());
-        assertFalse(cacheClient.containsKey(proc.getKey(), new StringSerDe()));
-    }
-
     @Test
     public void testChangeTableNameClearsState() {
         final long now = System.currentTimeMillis();
@@ -412,36 +311,11 @@ public class TestGetHBase {
     // Mock processor to override the location of the state file
     private static class MockGetHBase extends GetHBase {
 
-        private static final String DEFAULT_STATE_FILE_NAME = 
"target/TestGetHBase.bin";
-
-        private File stateFile;
-
-        public MockGetHBase() {
-            this(new File(DEFAULT_STATE_FILE_NAME));
-        }
-
-        public MockGetHBase(final File stateFile) {
-            this.stateFile = stateFile;
-        }
-
-        public void setStateFile(final File stateFile) {
-            this.stateFile = stateFile;
-        }
-
         @Override
         protected int getBatchSize() {
             return 2;
         }
 
-        @Override
-        protected File getStateDir() {
-            return new File("target");
-        }
-
-        @Override
-        protected File getStateFile() {
-            return stateFile;
-        }
     }
 
     private class MockCacheClient extends AbstractControllerService implements 
DistributedMapCacheClient {

Reply via email to