This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new bee65b8447 NIFI-12825: implemented ListHBaseRegions processor bee65b8447 is described below commit bee65b8447303a49a5a244aed027ea387c96a2d8 Author: Emilio Setiadarma <emiliosetiada...@gmail.com> AuthorDate: Mon Feb 12 01:22:52 2024 -0800 NIFI-12825: implemented ListHBaseRegions processor Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #8439 --- .../org/apache/nifi/hbase/ListHBaseRegions.java | 151 +++++++++++++++ .../services/org.apache.nifi.processor.Processor | 1 + .../apache/nifi/hbase/MockHBaseClientService.java | 15 ++ .../apache/nifi/hbase/TestListHBaseRegions.java | 209 +++++++++++++++++++++ .../apache/nifi/hbase/HBaseClientException.java | 27 +++ .../org/apache/nifi/hbase/HBaseClientService.java | 8 + .../org/apache/nifi/hbase/scan/HBaseRegion.java | 57 ++++++ .../apache/nifi/hbase/HBase_2_ClientService.java | 35 ++++ 8 files changed, 503 insertions(+) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ListHBaseRegions.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ListHBaseRegions.java new file mode 100644 index 0000000000..032ec74f78 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ListHBaseRegions.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.scan.HBaseRegion; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"hbase", "regions", "scan", "rowkey"}) +@CapabilityDescription("Returns the information about the regions of an HBase table, including ID, name and row key ranges. " + + "This information is helpful to feed into start row key and end row key for scans to HBase, e.g. using the ScanHBase processor.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.region.name", description = "The name of the HBase region."), + @WritesAttribute(attribute = "hbase.region.id", description = "The id of the HBase region."), + @WritesAttribute(attribute = "hbase.region.startRowKey", description = "The starting row key (inclusive) of the HBase region. " + + "The bytes returned from HBase is converted into a UTF-8 encoded string."), + @WritesAttribute(attribute = "hbase.region.endRowKey", description = "The ending row key (exclusive) of the HBase region. " + + "The bytes returned from HBase is converted into a UTF-8 encoded string.") +}) +public class ListHBaseRegions extends AbstractProcessor { + static final String HBASE_REGION_NAME_ATTR = "hbase.region.name"; + static final String HBASE_REGION_ID_ATTR = "hbase.region.id"; + static final String HBASE_REGION_START_ROW_ATTR = "hbase.region.startRowKey"; + static final String HBASE_REGION_END_ROW_ATTR = "hbase.region.endRowKey"; + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Client Service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the HBase Table to put data into") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor ROUTE_DEGENERATE_REGIONS = new PropertyDescriptor.Builder() + .name("Route Degenerate Regions") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles with information on regions of the HBase table are routed to this relationship.") + .build(); + + static final Relationship REL_DEGENERATE = new Relationship.Builder() + .name("degenerate") + .description("If \\\"Route Degenerate Regions\\\" is set, any " + + "FlowFile(s) that contains information about a region that is degenerate will be routed " + + "to this relationship. Otherwise, they will be sent to the success relationship.") + .autoTerminateDefault(true) + .build(); + + @Override + public Set<Relationship> getRelationships() { + return Set.of(REL_SUCCESS, REL_DEGENERATE); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return List.of( + HBASE_CLIENT_SERVICE, + TABLE_NAME, + ROUTE_DEGENERATE_REGIONS + ); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + if (StringUtils.isBlank(tableName)) { + getLogger().error("Table Name is blank or null, no regions information to be fetched."); + context.yield(); + return; + } + + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + final boolean routeDegenerateRegions = context.getProperty(ROUTE_DEGENERATE_REGIONS).asBoolean(); + + try { + final List<HBaseRegion> hBaseRegions = hBaseClientService.listHBaseRegions(tableName); + for (final HBaseRegion region : hBaseRegions) { + final FlowFile flowFile = session.create(); + session.putAttribute(flowFile, HBASE_REGION_NAME_ATTR, region.getRegionName()); + session.putAttribute(flowFile, HBASE_REGION_ID_ATTR, String.valueOf(region.getRegionId())); + if (region.getStartRowKey() == null) { + session.putAttribute(flowFile, HBASE_REGION_START_ROW_ATTR, ""); + } else { + session.putAttribute(flowFile, HBASE_REGION_START_ROW_ATTR, new String(region.getStartRowKey(), StandardCharsets.UTF_8)); + } + + if (region.getEndRowKey() == null) { + session.putAttribute(flowFile, HBASE_REGION_END_ROW_ATTR, ""); + } else { + session.putAttribute(flowFile, HBASE_REGION_END_ROW_ATTR, new String(region.getEndRowKey(), StandardCharsets.UTF_8)); + } + + if (region.isDegenerate() && routeDegenerateRegions) { + getLogger().warn("Region with id {} and name {} is degenerate. Routing to degenerate relationship.", region.getRegionId(), region.getRegionName()); + session.transfer(flowFile, REL_DEGENERATE); + } else { + session.transfer(flowFile, REL_SUCCESS); + } + } + } catch (final HBaseClientException e) { + getLogger().error("Failed to receive information on HBase regions for table {} due to {}", tableName, e); + context.yield(); + throw new RuntimeException(e); + } + } +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a55064ba52..c0ea17418e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,6 +16,7 @@ org.apache.nifi.hbase.DeleteHBaseCells org.apache.nifi.hbase.DeleteHBaseRow org.apache.nifi.hbase.GetHBase +org.apache.nifi.hbase.ListHBaseRegions org.apache.nifi.hbase.PutHBaseCell org.apache.nifi.hbase.PutHBaseJSON org.apache.nifi.hbase.PutHBaseRecord diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index b913edf440..e51de5a6b3 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.HBaseRegion; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; @@ -42,6 +43,7 @@ public class MockHBaseClientService extends AbstractControllerService implements private int numScans = 0; private int numPuts = 0; private int linesBeforeException = -1; + private List<HBaseRegion> regionsToReturn = new ArrayList<>(); @Override public void put(String tableName, Collection<PutFlowFile> puts) throws IOException { @@ -244,6 +246,19 @@ public class MockHBaseClientService extends AbstractControllerService implements return numScans; } + @Override + public List<HBaseRegion> listHBaseRegions(final String tableName) throws HBaseClientException { + return regionsToReturn; + } + + public void addHBaseRegion(final HBaseRegion region) { + regionsToReturn.add(region); + } + + public void addHBaseRegions(final List<HBaseRegion> regions) { + regionsToReturn.addAll(regions); + } + @Override public byte[] toBytes(final boolean b) { return new byte[] { b ? (byte) -1 : (byte) 0 }; diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestListHBaseRegions.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestListHBaseRegions.java new file mode 100644 index 0000000000..1618041ead --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestListHBaseRegions.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.hbase.scan.HBaseRegion; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ExtendWith(MockitoExtension.class) +public class TestListHBaseRegions { + private static final String TABLE_NAME = "nifi"; + private static final String HBASE_CLIENT_SERVICE_NAME = "hBaseClientService"; + + private TestRunner runner; + private ListHBaseRegions proc; + + private MockHBaseClientService hBaseClientService; + + @BeforeEach + public void setup() throws InitializationException { + proc = new ListHBaseRegions(); + runner = TestRunners.newTestRunner(proc); + + hBaseClientService = new MockHBaseClientService(); + runner.addControllerService(HBASE_CLIENT_SERVICE_NAME, hBaseClientService); + runner.enableControllerService(hBaseClientService); + + runner.setProperty(ListHBaseRegions.TABLE_NAME, TABLE_NAME); + runner.setProperty(ListHBaseRegions.HBASE_CLIENT_SERVICE, HBASE_CLIENT_SERVICE_NAME); + } + + @Test + public void testAllFlowFilesToSuccess() throws HBaseClientException { + runner.setProperty(ListHBaseRegions.ROUTE_DEGENERATE_REGIONS, "false"); + runner.assertValid(); + + final String startRowKey1 = "1"; + final String endRowKey1 = "5"; + final String regionName1 = "region-1"; + final long regionId1 = 1L; + final boolean isDegenerate1 = false; + final HBaseRegion hBaseRegion1 = new HBaseRegion( + startRowKey1.getBytes(StandardCharsets.UTF_8), + endRowKey1.getBytes(StandardCharsets.UTF_8), + regionName1, + regionId1, + isDegenerate1 + ); + + // this is a "degenerate" region where startRowKey > endRowKey + final String startRowKey2 = "10"; + final String endRowKey2 = "6"; + final String regionName2 = "region-2"; + final long regionId2 = 2L; + final boolean isDegenerate2 = true; + final HBaseRegion hBaseRegion2 = new HBaseRegion( + startRowKey2.getBytes(StandardCharsets.UTF_8), + endRowKey2.getBytes(StandardCharsets.UTF_8), + regionName2, + regionId2, + isDegenerate2 + ); + + final List<HBaseRegion> regions = Arrays.asList(hBaseRegion1, hBaseRegion2); + hBaseClientService.addHBaseRegions(regions); + + runner.run(1); + runner.assertAllFlowFilesTransferred(ListHBaseRegions.REL_SUCCESS, 2); + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHBaseRegions.REL_SUCCESS); + + assertEquals(String.valueOf(regionId1), flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + + assertEquals(String.valueOf(regionId2), flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + } + + @Test + public void testDegenerateRegionsToDegenerateRelationship() throws HBaseClientException { + runner.setProperty(ListHBaseRegions.ROUTE_DEGENERATE_REGIONS, "true"); + runner.assertValid(); + + final String startRowKey1 = "1"; + final String endRowKey1 = "5"; + final String regionName1 = "region-1"; + final long regionId1 = 1L; + final boolean isDegenerate1 = false; + final HBaseRegion hBaseRegion1 = new HBaseRegion( + startRowKey1.getBytes(StandardCharsets.UTF_8), + endRowKey1.getBytes(StandardCharsets.UTF_8), + regionName1, + regionId1, + isDegenerate1 + ); + + // this is a "degenerate" region where startRowKey > endRowKey + final String startRowKey2 = "10"; + final String endRowKey2 = "6"; + final String regionName2 = "region-2"; + final long regionId2 = 2L; + final boolean isDegenerate2 = true; + final HBaseRegion hBaseRegion2 = new HBaseRegion( + startRowKey2.getBytes(StandardCharsets.UTF_8), + endRowKey2.getBytes(StandardCharsets.UTF_8), + regionName2, + regionId2, + isDegenerate2 + ); + + final List<HBaseRegion> regions = Arrays.asList(hBaseRegion1, hBaseRegion2); + hBaseClientService.addHBaseRegions(regions); + + runner.run(1); + runner.assertTransferCount(ListHBaseRegions.REL_SUCCESS, 1); + final List<MockFlowFile> successFlowFiles = runner.getFlowFilesForRelationship(ListHBaseRegions.REL_SUCCESS); + + assertEquals(String.valueOf(regionId1), successFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName1, successFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey1, successFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey1, successFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + + runner.assertTransferCount(ListHBaseRegions.REL_DEGENERATE, 1); + final List<MockFlowFile> degenerateFlowFiles = runner.getFlowFilesForRelationship(ListHBaseRegions.REL_DEGENERATE); + + assertEquals(String.valueOf(regionId2), degenerateFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName2, degenerateFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey2, degenerateFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey2, degenerateFlowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + } + + @Test + public void testShouldNotRouteToDegenerateIfNoDegenerateRegions() throws HBaseClientException { + runner.setProperty(ListHBaseRegions.ROUTE_DEGENERATE_REGIONS, "false"); + runner.assertValid(); + + final String startRowKey1 = "1"; + final String endRowKey1 = "5"; + final String regionName1 = "region-1"; + final long regionId1 = 1L; + final boolean isDegenerate1 = false; + final HBaseRegion hBaseRegion1 = new HBaseRegion( + startRowKey1.getBytes(StandardCharsets.UTF_8), + endRowKey1.getBytes(StandardCharsets.UTF_8), + regionName1, + regionId1, + isDegenerate1 + ); + + final String startRowKey2 = "5"; + final String endRowKey2 = "10"; + final String regionName2 = "region-2"; + final long regionId2 = 2L; + final boolean isDegenerate2 = false; + final HBaseRegion hBaseRegion2 = new HBaseRegion( + startRowKey2.getBytes(StandardCharsets.UTF_8), + endRowKey2.getBytes(StandardCharsets.UTF_8), + regionName2, + regionId2, + isDegenerate2 + ); + + final List<HBaseRegion> regions = Arrays.asList(hBaseRegion1, hBaseRegion2); + hBaseClientService.addHBaseRegions(regions); + + runner.run(1); + runner.assertAllFlowFilesTransferred(ListHBaseRegions.REL_SUCCESS, 2); + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHBaseRegions.REL_SUCCESS); + + assertEquals(String.valueOf(regionId1), flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey1, flowFiles.get(0).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + + assertEquals(String.valueOf(regionId2), flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_ID_ATTR)); + assertEquals(regionName2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_NAME_ATTR)); + assertEquals(startRowKey2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_START_ROW_ATTR)); + assertEquals(endRowKey2, flowFiles.get(1).getAttribute(ListHBaseRegions.HBASE_REGION_END_ROW_ATTR)); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientException.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientException.java new file mode 100644 index 0000000000..4208d17299 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +public class HBaseClientException extends Exception { + public HBaseClientException(final String message) { + super(message); + } + + public HBaseClientException(final Throwable cause) { + super(cause); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index cc7709056e..87972fdc36 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -22,6 +22,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.HBaseRegion; import org.apache.nifi.hbase.scan.ResultHandler; import java.io.IOException; @@ -169,6 +170,13 @@ public interface HBaseClientService extends ControllerService { void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, Boolean isReversed, Boolean blockCache, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException; + /** + * Returns a {@link List} of {@link HBaseRegion} objects that represent information about the HBase table + * regions for all regions in the HBase table. + * @param tableName the name of the HBase table to fetch region information for + */ + List<HBaseRegion> listHBaseRegions(String tableName) throws HBaseClientException; + /** * Converts the given boolean to it's byte representation. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/HBaseRegion.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/HBaseRegion.java new file mode 100644 index 0000000000..df23c07ed2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/HBaseRegion.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.scan; + +public class HBaseRegion { + private byte[] startRowKey; + private byte[] endRowKey; + private String regionName; + private long regionId; + private boolean isDegenerate; + + public HBaseRegion(final byte[] startRowKey, + final byte[] endRowKey, + final String regionName, + final long regionId, + final boolean isDegenerate) { + this.startRowKey = startRowKey; + this.endRowKey = endRowKey; + this.regionName = regionName; + this.regionId = regionId; + this.isDegenerate = isDegenerate; + } + + public byte[] getStartRowKey() { + return startRowKey; + } + + public byte[] getEndRowKey() { + return endRowKey; + } + + public String getRegionName() { + return regionName; + } + + public long getRegionId() { + return regionId; + } + + public boolean isDegenerate() { + return isDegenerate; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java index 627ba83b16..80894e73bb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -76,6 +79,7 @@ import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.HBaseRegion; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.kerberos.KerberosCredentialsService; @@ -864,6 +868,37 @@ public class HBase_2_ClientService extends AbstractControllerService implements return resultCell; } + @Override + public List<HBaseRegion> listHBaseRegions(final String tableName) throws HBaseClientException { + if (connection == null || connection.isClosed() || connection.isAborted()) { + final String errorMsg = String.format( + "Unable to fetch regions for table %s since there is no active connection to HBase.", + tableName + ); + throw new IllegalStateException(errorMsg); + } + + try { + final List<RegionInfo> regionInfos = connection.getAdmin().getRegions(TableName.valueOf(tableName)); + // maps to the NiFi HBaseRegion object + final List<HBaseRegion> regions = regionInfos.stream() + .map(regionInfo -> + new HBaseRegion( + regionInfo.getStartKey(), + regionInfo.getEndKey(), + regionInfo.getRegionNameAsString(), + regionInfo.getRegionId(), + regionInfo.isDegenerate() + ) + ) + .collect(Collectors.toList()); + return regions; + } catch (final IOException e) { + logger.error("Encountered error while communicating with HBase.", e); + throw new HBaseClientException(e); + } + } + static protected class ValidationResources { private final String configResources; private final Configuration configuration;