This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 68228921a4 NIFI-12664 Removed deprecated DMC in GetHBase 68228921a4 is described below commit 68228921a43c506e92a0476f550951e164dfb90e Author: Pierre Villard <pierre.villard...@gmail.com> AuthorDate: Thu Jan 25 09:22:41 2024 +0400 NIFI-12664 Removed deprecated DMC in GetHBase This closes #8301 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../main/java/org/apache/nifi/hbase/GetHBase.java | 125 ++------------------ .../java/org/apache/nifi/hbase/TestGetHBase.java | 128 +-------------------- 2 files changed, 8 insertions(+), 245 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 5777a67861..8b2cf0d973 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -26,25 +26,20 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; -import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.io.JsonRowSerializer; import org.apache.nifi.hbase.io.RowSerializer; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; -import org.apache.nifi.hbase.util.ObjectSerDe; -import org.apache.nifi.hbase.util.StringSerDe; +import org.apache.nifi.migration.PropertyConfiguration; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -52,11 +47,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -103,13 +94,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor .required(true) .identifiesControllerService(HBaseClientService.class) .build(); - static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() - .name("Distributed Cache Service") - .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" + - " so that if a new node begins pulling data, it won't duplicate all of the work that has been done.") - .required(false) - .identifiesControllerService(DistributedMapCacheClient.class) - .build(); static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() .name("Character Set") .description("Specifies which character set is used to encode the data in HBase") @@ -157,7 +141,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor private final AtomicReference<ScanResult> lastResult = new AtomicReference<>(); private volatile List<Column> columns = new ArrayList<>(); - private volatile boolean justElectedPrimaryNode = false; private volatile String previousTable = null; @Override @@ -169,7 +152,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(HBASE_CLIENT_SERVICE); - properties.add(DISTRIBUTED_CACHE_SERVICE); properties.add(TABLE_NAME); properties.add(COLUMNS); properties.add(AUTHORIZATIONS); @@ -204,21 +186,14 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor } } + @Override + public void migrateProperties(PropertyConfiguration config) { + super.migrateProperties(config); + config.removeProperty("Distributed Cache Service"); + } + @OnScheduled public void parseColumns(final ProcessContext context) throws IOException { - final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); - if (!stateMap.getStateVersion().isPresent()) { - // no state has been stored in the State Manager - check if we have state stored in the - // DistributedMapCacheClient service and migrate it if so - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - final ScanResult scanResult = getState(client); - if (scanResult != null) { - context.getStateManager().setState(scanResult.toFlatMap(), Scope.CLUSTER); - } - - clearState(client); - } - final String columnsValue = context.getProperty(COLUMNS).evaluateAttributeExpressions().getValue(); final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); @@ -236,19 +211,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor } } - @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); - } - - @OnRemoved - public void onRemoved(final ProcessContext context) { - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - if (client != null) { - clearState(client); - } - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); @@ -427,91 +389,18 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor return 500; } - protected File getStateDir() { - return new File("conf/state"); - } - - protected File getStateFile() { - return new File(getStateDir(), "getHBase-" + getIdentifier()); - } - - protected String getKey() { - return "getHBase-" + getIdentifier() + "-state"; - } - protected List<Column> getColumns() { return columns; } - private void clearState(final DistributedMapCacheClient client) { - final File localState = getStateFile(); - if (localState.exists()) { - localState.delete(); - } - - if (client != null) { - try { - client.remove(getKey(), new StringSerDe()); - } catch (IOException e) { - getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e}); - } - } - } - - private ScanResult getState(final ProcessSession session) throws IOException { final StateMap stateMap = session.getState(Scope.CLUSTER); if (!stateMap.getStateVersion().isPresent()) { return null; } - return ScanResult.fromFlatMap(stateMap.toMap()); } - private ScanResult getState(final DistributedMapCacheClient client) throws IOException { - final StringSerDe stringSerDe = new StringSerDe(); - final ObjectSerDe objectSerDe = new ObjectSerDe(); - - ScanResult scanResult = lastResult.get(); - // if we have no previous result, or we just became primary, pull from distributed cache - if (scanResult == null || justElectedPrimaryNode) { - if (client != null) { - final Object obj = client.get(getKey(), stringSerDe, objectSerDe); - if (obj == null || !(obj instanceof ScanResult)) { - scanResult = null; - } else { - scanResult = (ScanResult) obj; - getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()}); - } - } - - // no requirement to pull an update from the distributed cache anymore. - justElectedPrimaryNode = false; - } - - // Check the persistence file. We want to use the latest timestamp that we have so that - // we don't duplicate data. - final File file = getStateFile(); - if (file.exists()) { - try (final InputStream fis = new FileInputStream(file); - final ObjectInputStream ois = new ObjectInputStream(fis)) { - - final Object obj = ois.readObject(); - if (obj != null && (obj instanceof ScanResult)) { - final ScanResult localScanResult = (ScanResult) obj; - if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) { - scanResult = localScanResult; - getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache"); - } - } - } catch (final IOException | ClassNotFoundException ioe) { - getLogger().warn("Failed to recover persisted state from {} due to {}. Assuming that state from distributed cache is correct.", new Object[]{file, ioe}); - } - } - - return scanResult; - } - public static class ScanResult implements Serializable { private static final long serialVersionUID = 1L; diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java index 5245d142d7..6ca69c1cd7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java @@ -16,23 +16,18 @@ */ package org.apache.nifi.hbase; -import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.components.state.Scope; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.hbase.GetHBase.ScanResult; import org.apache.nifi.hbase.scan.Column; -import org.apache.nifi.hbase.util.StringSerDe; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.File; import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; @@ -47,11 +42,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class TestGetHBase { @@ -74,22 +67,12 @@ public class TestGetHBase { runner.enableControllerService(hBaseClient); runner.setProperty(GetHBase.TABLE_NAME, "nifi"); - runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient"); runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); runner.setProperty(GetHBase.AUTHORIZATIONS, ""); runner.setValidateExpressionUsage(true); } - @AfterEach - public void cleanup() { - final File file = proc.getStateFile(); - if (file.exists()) { - file.delete(); - } - assertFalse(file.exists()); - } - @Test public void testColumnsValidation() { runner.assertValid(); @@ -139,12 +122,6 @@ public class TestGetHBase { @Test public void testPersistAndRecoverFromLocalState() { - final File stateFile = new File("target/test-recover-state.bin"); - if (!stateFile.delete() && stateFile.exists()) { - fail("Could not delete state file " + stateFile); - } - proc.setStateFile(stateFile); - final long now = System.currentTimeMillis(); final Map<String, String> cells = new HashMap<>(); @@ -164,7 +141,7 @@ public class TestGetHBase { runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5); runner.clearTransferState(); - proc = new MockGetHBase(stateFile); + proc = new MockGetHBase(); hBaseClient.addResult("row0", cells, now - 2); hBaseClient.addResult("row1", cells, now - 1); @@ -195,14 +172,6 @@ public class TestGetHBase { runner.run(); runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5); - // delete the processor's local state to simulate becoming the primary node - // for the first time, should use the state from distributed cache - final File stateFile = proc.getStateFile(); - if (!stateFile.delete() && stateFile.exists()) { - fail("Could not delete state file " + stateFile); - } - proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); - hBaseClient.addResult("row0", cells, now - 2); hBaseClient.addResult("row1", cells, now - 1); hBaseClient.addResult("row2", cells, now - 1); @@ -214,76 +183,6 @@ public class TestGetHBase { runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0); } - @Test - public void testBecomePrimaryWithNewerLocalState() throws InitializationException { - final long now = System.currentTimeMillis(); - - final Map<String, String> cells = new HashMap<>(); - cells.put("greeting", "hello"); - cells.put("name", "nifi"); - - hBaseClient.addResult("row0", cells, now - 2); - hBaseClient.addResult("row1", cells, now - 1); - hBaseClient.addResult("row2", cells, now - 1); - hBaseClient.addResult("row3", cells, now); - - runner.run(100); - runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4); - - // trick for testing so that row4 gets written to local state but not to the real cache - final MockCacheClient otherCacheClient = new MockCacheClient(); - runner.addControllerService("otherCacheClient", otherCacheClient); - runner.enableControllerService(otherCacheClient); - runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "otherCacheClient"); - - hBaseClient.addResult("row4", cells, now + 1); - runner.run(); - runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5); - - // set back the original cache cacheClient which is missing row4 - runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient"); - - // become the primary node, but we have existing local state with rows 0-4 - // so we shouldn't get any output because we should use the local state - proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); - - hBaseClient.addResult("row0", cells, now - 2); - hBaseClient.addResult("row1", cells, now - 1); - hBaseClient.addResult("row2", cells, now - 1); - hBaseClient.addResult("row3", cells, now); - hBaseClient.addResult("row4", cells, now + 1); - - runner.clearTransferState(); - runner.run(100); - runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0); - } - - @Test - public void testOnRemovedClearsState() throws IOException { - final long now = System.currentTimeMillis(); - - final Map<String, String> cells = new HashMap<>(); - cells.put("greeting", "hello"); - cells.put("name", "nifi"); - - hBaseClient.addResult("row0", cells, now - 2); - hBaseClient.addResult("row1", cells, now - 1); - hBaseClient.addResult("row2", cells, now - 1); - hBaseClient.addResult("row3", cells, now); - - runner.run(100); - runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4); - - // should have a local state file and a cache entry before removing - runner.getStateManager().assertStateSet(Scope.CLUSTER); - - proc.onRemoved(runner.getProcessContext()); - - // onRemoved should have cleared both - assertFalse(proc.getStateFile().exists()); - assertFalse(cacheClient.containsKey(proc.getKey(), new StringSerDe())); - } - @Test public void testChangeTableNameClearsState() { final long now = System.currentTimeMillis(); @@ -412,36 +311,11 @@ public class TestGetHBase { // Mock processor to override the location of the state file private static class MockGetHBase extends GetHBase { - private static final String DEFAULT_STATE_FILE_NAME = "target/TestGetHBase.bin"; - - private File stateFile; - - public MockGetHBase() { - this(new File(DEFAULT_STATE_FILE_NAME)); - } - - public MockGetHBase(final File stateFile) { - this.stateFile = stateFile; - } - - public void setStateFile(final File stateFile) { - this.stateFile = stateFile; - } - @Override protected int getBatchSize() { return 2; } - @Override - protected File getStateDir() { - return new File("target"); - } - - @Override - protected File getStateFile() { - return stateFile; - } } private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {