This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6d6a4f5ac0 NIFI-14941 Updated deprecated method usage for Azure
EventHubs (#10300)
6d6a4f5ac0 is described below
commit 6d6a4f5ac052bca19c10014065d06bae07576a43
Author: dan-s1 <[email protected]>
AuthorDate: Sat Sep 13 17:32:12 2025 -0400
NIFI-14941 Updated deprecated method usage for Azure EventHubs (#10300)
Signed-off-by: David Handermann <[email protected]>
---
.../processors/azure/eventhub/ConsumeAzureEventHub.java | 2 +-
.../nifi/processors/azure/eventhub/GetAzureEventHub.java | 2 +-
.../checkpoint/ComponentStateCheckpointStoreUtils.java | 6 +++---
.../processors/azure/eventhub/GetAzureEventHubTest.java | 6 +++---
.../eventhub/checkpoint/AbstractCheckpointStoreTest.java | 14 +++++++-------
.../checkpoint/ComponentStateCheckpointStoreTest.java | 6 +++---
.../checkpoint/ComponentStateCheckpointStoreUtilsTest.java | 2 +-
7 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index bcc98d92d2..bab1979352 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -620,7 +620,7 @@ public class ConsumeAzureEventHub extends
AbstractSessionFactoryProcessor implem
) {
if (lastEnqueuedEventProperties != null) {
attributes.put("eventhub.enqueued.timestamp",
String.valueOf(lastEnqueuedEventProperties.getEnqueuedTime()));
- attributes.put("eventhub.offset",
String.valueOf(lastEnqueuedEventProperties.getOffset()));
+ attributes.put("eventhub.offset",
lastEnqueuedEventProperties.getOffsetString());
attributes.put("eventhub.sequence",
String.valueOf(lastEnqueuedEventProperties.getSequenceNumber()));
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 95857f01cc..1ee8389595 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -431,7 +431,7 @@ public class GetAzureEventHub extends AbstractProcessor
implements AzureEventHub
final EventData eventData = partitionEvent.getData();
attributes.put("eventhub.enqueued.timestamp",
String.valueOf(eventData.getEnqueuedTime()));
- attributes.put("eventhub.offset",
String.valueOf(eventData.getOffset()));
+ attributes.put("eventhub.offset", eventData.getOffsetString());
attributes.put("eventhub.sequence",
String.valueOf(eventData.getSequenceNumber()));
final PartitionContext partitionContext =
partitionEvent.getPartitionContext();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java
index fb745aea01..3b1741424a 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java
@@ -64,7 +64,7 @@ final class ComponentStateCheckpointStoreUtils {
.setEventHubName(context.getEventHubName())
.setConsumerGroup(context.getConsumerGroup())
.setPartitionId(context.getPartitionId())
- .setOffset(StringUtils.isNotEmpty(parts[0]) ?
Long.parseLong(parts[0]) : null)
+ .setOffsetString(StringUtils.isNotEmpty(parts[0]) ? parts[0] :
null)
.setSequenceNumber(StringUtils.isNotEmpty(parts[1]) ?
Long.parseLong(parts[1]) : null);
}
@@ -127,7 +127,7 @@ final class ComponentStateCheckpointStoreUtils {
static String createCheckpointValue(Checkpoint checkpoint) {
return String.format("%s/%s",
- checkpoint.getOffset() != null ?
checkpoint.getOffset().toString() : "",
+ checkpoint.getOffsetString() != null ?
checkpoint.getOffsetString() : "",
checkpoint.getSequenceNumber() != null ?
checkpoint.getSequenceNumber().toString() : "");
}
@@ -155,7 +155,7 @@ final class ComponentStateCheckpointStoreUtils {
", eventHubName='" + checkpoint.getEventHubName() + '\'' +
", consumerGroup='" + checkpoint.getConsumerGroup() + '\'' +
", partitionId='" + checkpoint.getPartitionId() + '\'' +
- ", offset=" + checkpoint.getOffset() +
+ ", offset=" + checkpoint.getOffsetString() +
", sequenceNumber=" + checkpoint.getSequenceNumber() +
'}';
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 6844274bdf..654b275ef6 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -55,7 +55,7 @@ public class GetAzureEventHubTest {
private static final String CONSUMER_GROUP = "$Default";
private static final Instant ENQUEUED_TIME = Instant.now();
private static final long SEQUENCE_NUMBER = 32;
- private static final long OFFSET = 64;
+ private static final String OFFSET = "64";
private static final String PARTITION_ID = "0";
private static final String CONTENT = String.class.getSimpleName();
@@ -135,7 +135,7 @@ public class GetAzureEventHubTest {
final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(CONTENT);
flowFile.assertAttributeEquals("eventhub.enqueued.timestamp",
ENQUEUED_TIME.toString());
- flowFile.assertAttributeEquals("eventhub.offset",
Long.toString(OFFSET));
+ flowFile.assertAttributeEquals("eventhub.offset", OFFSET);
flowFile.assertAttributeEquals("eventhub.sequence",
Long.toString(SEQUENCE_NUMBER));
flowFile.assertAttributeEquals("eventhub.name", EVENT_HUB_NAME);
}
@@ -217,7 +217,7 @@ public class GetAzureEventHubTest {
}
@Override
- public Long getOffset() {
+ public String getOffsetString() {
return OFFSET;
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java
index 97eef55ef4..bbf9592a05 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java
@@ -39,7 +39,7 @@ abstract class AbstractCheckpointStoreTest {
static final Long LAST_MODIFIED_TIME = 1234567890L;
static final String ETAG = "my-etag";
- static final Long OFFSET = 10L;
+ static final String OFFSET = "10";
static final Long SEQUENCE_NUMBER = 1L;
PartitionOwnership partitionOwnership1;
@@ -83,7 +83,7 @@ abstract class AbstractCheckpointStoreTest {
.setETag(null);
}
- Checkpoint createCheckpoint(String partitionId, Long offset, Long
sequenceNumber) {
+ Checkpoint createCheckpoint(String partitionId, String offset, Long
sequenceNumber) {
return createCheckpoint(
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
@@ -99,14 +99,14 @@ abstract class AbstractCheckpointStoreTest {
String eventHubName,
String consumerGroup,
String partitionId,
- Long offset,
+ String offset,
Long sequenceNumber) {
return new TestableCheckpoint()
.setFullyQualifiedNamespace(fullyQualifiedNamespace)
.setEventHubName(eventHubName)
.setConsumerGroup(consumerGroup)
.setPartitionId(partitionId)
- .setOffset(offset)
+ .setOffsetString(offset)
.setSequenceNumber(sequenceNumber);
}
@@ -135,7 +135,7 @@ abstract class AbstractCheckpointStoreTest {
.setEventHubName(original.getEventHubName())
.setConsumerGroup(original.getConsumerGroup())
.setPartitionId(original.getPartitionId())
- .setOffset(original.getOffset())
+ .setOffsetString(original.getOffsetString())
.setSequenceNumber(original.getSequenceNumber());
}
@@ -200,7 +200,7 @@ abstract class AbstractCheckpointStoreTest {
&& Objects.equals(getEventHubName(),
that.getEventHubName())
&& Objects.equals(getConsumerGroup(),
that.getConsumerGroup())
&& Objects.equals(getPartitionId(), that.getPartitionId())
- && Objects.equals(getOffset(), that.getOffset())
+ && Objects.equals(getOffsetString(),
that.getOffsetString())
&& Objects.equals(getSequenceNumber(),
that.getSequenceNumber());
}
@@ -211,7 +211,7 @@ abstract class AbstractCheckpointStoreTest {
getEventHubName(),
getConsumerGroup(),
getPartitionId(),
- getOffset(),
+ getOffsetString(),
getSequenceNumber()
);
}
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java
index 3525188642..4e1fde0cbd 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java
@@ -275,7 +275,7 @@ class ComponentStateCheckpointStoreTest extends
AbstractComponentStateCheckpoint
addToState(checkpoint1);
- Checkpoint newCheckpoint = createCheckpoint(PARTITION_ID_2, 20L, 2L);
+ Checkpoint newCheckpoint = createCheckpoint(PARTITION_ID_2, "20", 2L);
checkpointStore.updateCheckpoint(newCheckpoint).block();
@@ -294,7 +294,7 @@ class ComponentStateCheckpointStoreTest extends
AbstractComponentStateCheckpoint
addToState(checkpoint2);
Checkpoint updatedCheckpoint = copy(checkpoint2)
- .setOffset(20L)
+ .setOffsetString("20")
.setSequenceNumber(2L);
checkpointStore.updateCheckpoint(updatedCheckpoint).block();
@@ -305,7 +305,7 @@ class ComponentStateCheckpointStoreTest extends
AbstractComponentStateCheckpoint
@Test
void testCheckpointWithNullOffset() {
- checkpoint1.setOffset(null);
+ checkpoint1.setOffsetString(null);
checkpointStore.updateCheckpoint(checkpoint1).block();
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java
index 5bfcf7ac34..e8d6322d37 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java
@@ -66,7 +66,7 @@ class ComponentStateCheckpointStoreUtilsTest extends
AbstractCheckpointStoreTest
assertEquals(CONSUMER_GROUP, checkpoint.getConsumerGroup());
assertEquals(PARTITION_ID_1, checkpoint.getPartitionId());
- assertEquals(OFFSET, checkpoint.getOffset());
+ assertEquals(OFFSET, checkpoint.getOffsetString());
assertEquals(SEQUENCE_NUMBER, checkpoint.getSequenceNumber());
}