awelless commented on code in PR #10716:
URL: https://github.com/apache/nifi/pull/10716#discussion_r2676242596
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-services/src/main/java/org/apache/nifi/box/controllerservices/JsonConfigBasedBoxClientService.java:
##########
@@ -257,5 +233,8 @@ public void migrateProperties(PropertyConfiguration config)
{
config.renameProperty("app-config-file", APP_CONFIG_FILE.getName());
config.renameProperty("app-config-json", APP_CONFIG_JSON.getName());
ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+ // Remove timeout properties that are no longer supported in Box SDK
10.x
+ config.removeProperty("Connect Timeout");
+ config.removeProperty("Read Timeout");
Review Comment:
It should be possible to still configure these fields.
```java
final OkHttpClient okHttpClient =
BoxNetworkClient.getDefaultOkHttpClientBuilder()
.connectTimeout(context.getProperty(CONNECT_TIMEOUT).asDuration())
.readTimeout(context.getProperty(READ_TIMEOUT).asDuration())
.build();
final NetworkClient networkClient = new
BoxNetworkClient(okHttpClient);
final NetworkSession networkSession = new NetworkSession.Builder()
.networkClient(networkClient)
.build();
return new BoxClient.Builder(new BoxJWTAuth(...))
.networkSession(networkSession)
.build();
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java:
##########
@@ -174,55 +176,80 @@ private String initializeStartEventPosition(final
ProcessContext context) {
}
private String retrieveLatestStreamPosition() {
- final EventLog eventLog = getEventLog(LATEST_POSITION);
- return eventLog.getNextStreamPosition();
+ final Events events = getEvents(LATEST_POSITION);
+ return extractStreamPosition(events.getNextStreamPosition());
+ }
+
+ /**
+ * Extracts the stream position value from EventsNextStreamPositionField.
+ * The field can contain either a String or Long value.
+ */
+ private String extractStreamPosition(final EventsNextStreamPositionField
positionField) {
+ if (positionField == null) {
+ return null;
+ }
+ if (positionField.isString()) {
+ return positionField.getString();
+ } else if (positionField.isLongNumber()) {
+ return String.valueOf(positionField.getLongNumber());
+ }
+ return null;
Review Comment:
Since `EventsNextStreamPositionField ` is `OneOfTwo`, if nothing is matched
an `IllegalStateException` (or another exception) should be thrown instead of
returning `null`.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEventsTest.java:
##########
@@ -77,9 +78,7 @@ void testCaptureEvents() {
final String content = ff0.getContent();
assertTrue(content.contains("\"id\":\"1\""));
- assertTrue(content.contains("\"eventType\":\"ITEM_CREATE\""));
assertTrue(content.contains("\"id\":\"2\""));
- assertTrue(content.contains("\"eventType\":\"ITEM_TRASH\""));
Review Comment:
Why did we stop writing the event types?
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxFileUtils.java:
##########
@@ -16,46 +16,75 @@
*/
package org.apache.nifi.processors.box;
-import com.box.sdk.BoxFile;
-import com.box.sdk.BoxFolder;
-import com.box.sdk.BoxItem;
+import com.box.sdkgen.schemas.file.File;
+import com.box.sdkgen.schemas.folder.Folder;
+import com.box.sdkgen.schemas.foldermini.FolderMini;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static java.lang.String.valueOf;
-import static java.util.stream.Collectors.joining;
public final class BoxFileUtils {
public static final String BOX_URL = "https://app.box.com/file/";
- public static String getParentIds(final BoxItem.Info info) {
- return info.getPathCollection().stream()
- .map(BoxItem.Info::getID)
- .collect(joining(","));
+ public static String getParentIds(final File fileInfo) {
+ if (fileInfo.getPathCollection() == null ||
fileInfo.getPathCollection().getEntries() == null) {
+ return "";
+ }
+ return fileInfo.getPathCollection().getEntries().stream()
+ .map(FolderMini::getId)
+ .collect(Collectors.joining(","));
}
- public static String getParentPath(BoxItem.Info info) {
- return "/" + info.getPathCollection().stream()
- .filter(pathItemInfo -> !pathItemInfo.getID().equals("0"))
- .map(BoxItem.Info::getName)
- .collect(joining("/"));
+
+ public static String getParentPath(final File fileInfo) {
+ if (fileInfo.getPathCollection() == null ||
fileInfo.getPathCollection().getEntries() == null) {
+ return "/";
+ }
+ return "/" + fileInfo.getPathCollection().getEntries().stream()
+ .filter(pathItem -> !pathItem.getId().equals("0"))
+ .map(FolderMini::getName)
+ .collect(Collectors.joining("/"));
Review Comment:
Nitpicking.
```suggestion
return getParentPath(fileInfo.getPathCollection().getEntries());
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/DeleteBoxFileMetadataInstance.java:
##########
@@ -177,12 +177,12 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
/**
- * Returns a BoxFile object for the given file ID.
+ * Deletes metadata from a file.
*
- * @param fileId The ID of the file.
- * @return A BoxFile object for the given file ID.
+ * @param fileId The ID of the file.
+ * @param templateKey The template key of the metadata to delete.
*/
- BoxFile getBoxFile(final String fileId) {
- return new BoxFile(boxAPIConnection, fileId);
+ void deleteFileMetadata(final String fileId, final String templateKey) {
+ boxClient.getFileMetadata().deleteFileMetadataById(fileId,
DeleteFileMetadataByIdScope.ENTERPRISE, templateKey);
Review Comment:
We shouldn't always use ENTERPRISE.
The older SDK had the logic where it determined a scope based on the
template key. It seems we have to do the same in the processor now.
From old SDK Metadata class:
```java
public static final String DEFAULT_METADATA_TYPE = "properties";
static String scopeBasedOnType(String typeName) {
String scope;
if (typeName.equals(DEFAULT_METADATA_TYPE)) {
scope = GLOBAL_METADATA_SCOPE;
} else {
scope = ENTERPRISE_METADATA_SCOPE;
}
return scope;
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/UpdateBoxFileMetadataInstance.java:
##########
@@ -236,106 +242,116 @@ private Map<String, Object> readDesiredState(final
ProcessSession session,
return desiredState;
}
- private void updateMetadata(final Metadata metadata,
- final Map<String, Object> desiredState) {
- final List<String> currentKeys = metadata.getPropertyPaths();
+ private List<UpdateFileMetadataByIdRequestBody>
buildUpdateOperations(final MetadataFull currentMetadata,
+
final Map<String, Object> desiredState) {
+ final List<UpdateFileMetadataByIdRequestBody> operations = new
ArrayList<>();
- // Remove fields not in desired state
- for (final String propertyPath : currentKeys) {
- final String fieldName = propertyPath.substring(1); // Remove
leading '/'
+ // Get current field names from extra data
+ final Set<String> currentKeys = new HashSet<>();
+ final Map<String, Object> extraData = currentMetadata.getExtraData();
+ if (extraData != null) {
+ currentKeys.addAll(extraData.keySet());
+ }
+ // Remove fields not in desired state
+ for (final String fieldName : currentKeys) {
if (!desiredState.containsKey(fieldName)) {
- metadata.remove(propertyPath);
+ final String path = "/" + fieldName;
getLogger().debug("Removing metadata field: {}", fieldName);
+ operations.add(new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REMOVE)
+ .path(path)
+ .build());
}
}
// Add or update fields
for (final Map.Entry<String, Object> entry : desiredState.entrySet()) {
final String fieldName = entry.getKey();
final Object value = entry.getValue();
- final String propertyPath = "/" + fieldName;
+ final String path = "/" + fieldName;
+ final boolean exists = currentKeys.contains(fieldName);
- updateField(metadata, propertyPath, value,
currentKeys.contains(propertyPath));
+ final UpdateFileMetadataByIdRequestBody operation =
buildFieldOperation(path, value, exists, extraData);
+ if (operation != null) {
+ operations.add(operation);
+ }
}
+
+ return operations;
}
- private void updateField(final Metadata metadata,
- final String propertyPath,
- final Object value,
- final boolean exists) {
+ private UpdateFileMetadataByIdRequestBody buildFieldOperation(final String
path,
+ final Object
value,
+ final
boolean exists,
+ final
Map<String, Object> extraData) {
if (value == null) {
- throw new IllegalArgumentException("Null value found for property
path: " + propertyPath);
+ throw new IllegalArgumentException("Null value found for property
path: " + path);
}
- if (exists) {
- final Object currentValue = metadata.getValue(propertyPath);
-
- // Only update if values are different
+ // If exists, check if values are different
+ if (exists && extraData != null) {
+ final String fieldName = path.substring(1);
+ final Object currentValue = extraData.get(fieldName);
if (Objects.equals(currentValue, value)) {
- return;
+ return null; // No change needed
Review Comment:
Nitpicking. I suggest using `Optional` instead of returning `null` values.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java:
##########
@@ -16,196 +16,115 @@
*/
package org.apache.nifi.processors.box;
-import com.box.sdk.BoxEvent;
-import com.box.sdk.EventLog;
-import com.eclipsesource.json.Json;
-import com.eclipsesource.json.JsonValue;
+import com.box.sdkgen.schemas.event.Event;
+import com.box.sdkgen.schemas.event.EventEventTypeField;
+import com.box.sdkgen.schemas.events.Events;
+import com.box.sdkgen.schemas.events.EventsNextStreamPositionField;
+import com.box.sdkgen.serialization.json.EnumWrapper;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents.StartEventPosition;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import static java.util.Collections.emptyList;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
- private TestConsumeBoxEnterpriseEvents processor;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private Events mockEvents;
+ private Events emptyEvents;
@Override
@BeforeEach
void setUp() throws Exception {
- processor = new TestConsumeBoxEnterpriseEvents();
+ // Create the empty events mock (returned after first call to break
the loop)
+ emptyEvents = mock(Events.class);
+ when(emptyEvents.getEntries()).thenReturn(List.of());
+ EventsNextStreamPositionField nextStreamPositionEmpty =
mock(EventsNextStreamPositionField.class);
+ when(nextStreamPositionEmpty.isString()).thenReturn(true);
+ when(nextStreamPositionEmpty.getString()).thenReturn("end");
+
when(emptyEvents.getNextStreamPosition()).thenReturn(nextStreamPositionEmpty);
+
+ // Create a test subclass that overrides getEvents to use our mock data
+ final AtomicInteger callCount = new AtomicInteger(0);
+ final ConsumeBoxEnterpriseEventsTest testReference = this;
+
+ final ConsumeBoxEnterpriseEvents testSubject = new
ConsumeBoxEnterpriseEvents() {
+ @Override
+ Events getEvents(String position) {
+ // Return events on first call, empty on subsequent calls to
break the loop
+ if (callCount.getAndIncrement() == 0) {
+ return testReference.mockEvents != null ?
testReference.mockEvents : emptyEvents;
+ }
+ return emptyEvents;
+ }
+ };
- testRunner = TestRunners.newTestRunner(processor);
+ testRunner = TestRunners.newTestRunner(testSubject);
super.setUp();
}
- @ParameterizedTest
- @MethodSource("dataFor_testConsumeEvents")
- void testConsumeEvents(
- final StartEventPosition startEventPosition,
- final @Nullable String startOffset,
- final int expectedFlowFiles,
- final List<Integer> expectedEventIds) {
-
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION,
startEventPosition);
- if (startOffset != null) {
- testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET,
startOffset);
- }
-
- final TestEventStream eventStream = new TestEventStream();
- processor.overrideGetEventLog(eventStream::consume);
-
- eventStream.addEvent(0);
- eventStream.addEvent(1);
- eventStream.addEvent(2);
- testRunner.run();
+ @Test
+ void testConsumeEventsFromEarliest() throws Exception {
+
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION,
StartEventPosition.EARLIEST);
+
+ // Create mock events
+ List<Event> events = new ArrayList<>();
+ events.add(createMockEvent("1", EventEventTypeField.ITEM_CREATE));
+ events.add(createMockEvent("2", EventEventTypeField.ITEM_TRASH));
+ events.add(createMockEvent("3", EventEventTypeField.ITEM_UPLOAD));
+
+ mockEvents = mock(Events.class);
+ when(mockEvents.getEntries()).thenReturn(events);
+ EventsNextStreamPositionField nextStreamPosition3 =
mock(EventsNextStreamPositionField.class);
+ when(nextStreamPosition3.isString()).thenReturn(true);
+ when(nextStreamPosition3.getString()).thenReturn("3");
+
when(mockEvents.getNextStreamPosition()).thenReturn(nextStreamPosition3);
- eventStream.addEvent(3);
testRunner.run();
-
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
expectedFlowFiles);
-
- final List<Integer> eventIds =
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).stream()
- .flatMap(this::extractEventIds)
- .toList();
+
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
1);
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).getFirst();
- assertEquals(expectedEventIds, eventIds);
+ // Parse and verify the content
+ final String content = flowFile.getContent();
+ final JsonNode jsonArray = OBJECT_MAPPER.readTree(content);
+ assertEquals(3, jsonArray.size());
- assertEquals(eventIds.size(),
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
- }
-
- static List<Arguments> dataFor_testConsumeEvents() {
- return List.of(
- arguments(StartEventPosition.EARLIEST, null, 2, List.of(0, 1,
2, 3)),
- arguments(StartEventPosition.OFFSET, "1", 2, List.of(1, 2, 3)),
- arguments(StartEventPosition.OFFSET, "12345", 1, List.of(3)),
- arguments(StartEventPosition.LATEST, null, 1, List.of(3))
- );
+ assertEquals(3,
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
}
@Test
- void testGracefulTermination() throws InterruptedException {
- final CountDownLatch scheduledLatch = new CountDownLatch(1);
- final AtomicInteger consumedEvents = new AtomicInteger(0);
-
- // Infinite stream.
- processor.overrideGetEventLog(__ -> {
- scheduledLatch.countDown();
- consumedEvents.incrementAndGet();
- return createEventLog(List.of(createBoxEvent(1)), "");
- });
-
- final ExecutorService runExecutor =
Executors.newSingleThreadExecutor();
-
- try {
- // Starting the processor that consumes an infinite stream.
- final Future<?> runFuture = runExecutor.submit(() ->
testRunner.run(/*iterations=*/ 1, /*stopOnFinish=*/ false));
-
- assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Processor
did not start");
-
- // Triggering the processor to stop.
- testRunner.unSchedule();
-
- assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS),
"Processor did not stop gracefully");
-
-
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
consumedEvents.get());
- assertEquals(consumedEvents.get(),
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
- } finally {
- // We can't use try with resources, as Executors use a shutdown
method
- // which indefinitely waits for submitted tasks.
- runExecutor.shutdownNow();
- }
- }
-
- private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
- final JsonValue json = Json.parse(flowFile.getContent());
- return json.asArray().values().stream()
- .map(JsonValue::asObject)
- .map(jsonObject -> jsonObject.get("id").asString())
- .map(Integer::parseInt);
- }
-
- /**
- * This class is used to override external call in {@link
ConsumeBoxEnterpriseEvents#getEventLog(String)}.
- */
- private static class TestConsumeBoxEnterpriseEvents extends
ConsumeBoxEnterpriseEvents {
+ void testNoEventsReturned() {
+
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION,
StartEventPosition.EARLIEST);
- private volatile Function<String, EventLog> fakeEventLog;
+ // Set mockEvents to null so it returns emptyEvents
+ mockEvents = null;
- void overrideGetEventLog(final Function<String, EventLog>
fakeEventLog) {
- this.fakeEventLog = fakeEventLog;
- }
-
- @Override
- EventLog getEventLog(String position) {
- return fakeEventLog.apply(position);
- }
- }
-
- private static class TestEventStream {
-
- private static final String NOW_POSITION = "now";
-
- private final List<BoxEvent> events = new ArrayList<>();
-
- void addEvent(final int eventId) {
- events.add(createBoxEvent(eventId));
- }
-
- EventLog consume(final String position) {
- final String nextPosition = String.valueOf(events.size());
-
- if (NOW_POSITION.equals(position)) {
- return createEventLog(emptyList(), nextPosition);
- }
-
- final int streamPosition = Integer.parseInt(position);
- if (streamPosition > events.size()) {
- // Real Box API returns the latest offset position, even if
streamPosition was greater.
- return createEventLog(emptyList(), nextPosition);
- }
-
- final List<BoxEvent> consumedEvents =
events.subList(streamPosition, events.size());
-
- return createEventLog(consumedEvents, nextPosition);
- }
- }
+ testRunner.run();
- private static BoxEvent createBoxEvent(final int eventId) {
- return new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
+ testRunner.assertTransferCount(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
0);
}
- private static EventLog createEventLog(final List<BoxEvent>
consumedEvents, final String nextPosition) {
- // EventLog is not designed for being extended. Thus, mocking it.
- final EventLog eventLog = mock();
-
- when(eventLog.getNextStreamPosition()).thenReturn(nextPosition);
- lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
-
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
-
- return eventLog;
+ private Event createMockEvent(String eventId, EventEventTypeField
eventType) {
+ Event event = mock(Event.class);
+ when(event.getEventId()).thenReturn(eventId);
+ when(event.getEventType()).thenReturn(new EnumWrapper<>(eventType));
+ when(event.getCreatedAt()).thenReturn(null);
+ when(event.getSessionId()).thenReturn(null);
+ when(event.getCreatedBy()).thenReturn(null);
+ when(event.getSource()).thenReturn(null);
+ when(event.getAdditionalDetails()).thenReturn(null);
+ return event;
Review Comment:
Nitpicking. What do you think about using real objects provided by SDK
instead of mocks?
Same applies to other usages of mocks for SDK data classes.
```java
return new Event.Builder()
.eventId(eventId)
.eventType(eventType)
.build();
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java:
##########
@@ -174,55 +176,80 @@ private String initializeStartEventPosition(final
ProcessContext context) {
}
private String retrieveLatestStreamPosition() {
- final EventLog eventLog = getEventLog(LATEST_POSITION);
- return eventLog.getNextStreamPosition();
+ final Events events = getEvents(LATEST_POSITION);
+ return extractStreamPosition(events.getNextStreamPosition());
+ }
+
+ /**
+ * Extracts the stream position value from EventsNextStreamPositionField.
+ * The field can contain either a String or Long value.
+ */
+ private String extractStreamPosition(final EventsNextStreamPositionField
positionField) {
+ if (positionField == null) {
+ return null;
+ }
+ if (positionField.isString()) {
+ return positionField.getString();
+ } else if (positionField.isLongNumber()) {
+ return String.valueOf(positionField.getLongNumber());
+ }
+ return null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
while (isScheduled()) {
getLogger().debug("Consuming Box Events from position: {}",
streamPosition);
- final EventLog eventLog = getEventLog(streamPosition);
- streamPosition = eventLog.getNextStreamPosition();
+ final Events events = getEvents(streamPosition);
+ final String newPosition =
extractStreamPosition(events.getNextStreamPosition());
+ streamPosition = newPosition != null ? newPosition :
streamPosition;
- getLogger().debug("Consumed {} Box Enterprise Events. New
position: {}", eventLog.getSize(), streamPosition);
+ final int eventCount = events.getEntries() != null ?
events.getEntries().size() : 0;
+ getLogger().debug("Consumed {} Box Enterprise Events. New
position: {}", eventCount, streamPosition);
writeStreamPosition(streamPosition, session);
- if (eventLog.getSize() == 0) {
+ if (eventCount == 0) {
break;
}
- writeLogAsRecords(eventLog, session);
+ writeEventsAsRecords(events, session);
}
}
// Package-private for testing.
- EventLog getEventLog(final String position) {
- final EnterpriseEventsStreamRequest request = new
EnterpriseEventsStreamRequest()
+ Events getEvents(final String position) {
+ final GetEventsQueryParams.Builder queryParamsBuilder = new
GetEventsQueryParams.Builder()
.limit(LIMIT)
- .position(position)
- .typeNames(eventTypes);
+ .streamPosition(position)
+
.streamType(GetEventsQueryParamsStreamTypeField.ADMIN_LOGS_STREAMING);
+
+ // Note: Event type filtering has been removed in SDK v10 migration
+ // The eventType filter now requires
GetEventsQueryParamsEventTypeField enum values
+ // TODO: Implement event type filtering with proper enum conversion if
needed
Review Comment:
Does it mean we're going to bypass the filter and return all events for now?
If so, this can break the existing flows, which expect only particular event
types.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -135,47 +138,71 @@ public void onScheduled(final ProcessContext context) {
events = new LinkedBlockingQueue<>(queueCapacity);
} else {
// create new one with events from the old queue in case capacity
has changed
- final BlockingQueue<BoxEvent> newQueue = new
LinkedBlockingQueue<>(queueCapacity);
+ final BlockingQueue<Event> newQueue = new
LinkedBlockingQueue<>(queueCapacity);
newQueue.addAll(events);
events = newQueue;
}
- eventStream.addListener(new EventListener() {
+ // Start polling for events in a background thread
+ pollingExecutor = Executors.newSingleThreadScheduledExecutor();
+ pollingExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ pollEvents(context);
+ } catch (Exception e) {
+ getLogger().warn("Error polling Box events", e);
+ }
+ }, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ }
- @Override
- public void onEvent(BoxEvent event) {
- try {
- events.put(event);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while trying to
put the event into the queue", e);
+ private void pollEvents(final ProcessContext context) {
+ try {
+ final GetEventsQueryParams queryParams = new
GetEventsQueryParams.Builder()
+ .streamPosition(position.get())
+ .streamType(GetEventsQueryParamsStreamTypeField.ALL)
+ .build();
+
+ final Events eventResult =
boxClient.getEvents().getEvents(queryParams);
+
+ if (eventResult.getEntries() != null) {
+ for (Event event : eventResult.getEntries()) {
+ events.offer(event);
}
}
- @Override
- public void onNextPosition(long pos) {
+ final String newPosition =
extractStreamPosition(eventResult.getNextStreamPosition());
+ if (newPosition != null) {
+ position.set(newPosition);
try {
- context.getStateManager().setState(Map.of(POSITION_KEY,
String.valueOf(pos)), Scope.CLUSTER);
- position.set(pos);
+ context.getStateManager().setState(Map.of(POSITION_KEY,
newPosition), Scope.CLUSTER);
} catch (IOException e) {
- getLogger().warn("Failed to save position {} in processor
state", pos, e);
+ getLogger().warn("Failed to save position {} in processor
state", newPosition, e);
}
}
+ } catch (Exception e) {
+ getLogger().warn("An error occurred while polling Box events. Last
tracked position {}", position.get(), e);
+ }
+ }
- @Override
- public boolean onException(Throwable e) {
- getLogger().warn("An error has been received from the stream.
Last tracked position {}", position.get(), e);
- return true;
- }
-
- });
-
- eventStream.start();
+ /**
+ * Extracts the stream position value from EventsNextStreamPositionField.
+ * The field can contain either a String or Long value.
+ */
+ private String extractStreamPosition(final EventsNextStreamPositionField
positionField) {
+ if (positionField == null) {
+ return null;
+ }
+ if (positionField.isString()) {
+ return positionField.getString();
+ } else if (positionField.isLongNumber()) {
+ return String.valueOf(positionField.getLongNumber());
+ }
+ return null;
Review Comment:
Same here. It should be an exception instead of returning a `null`.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -98,10 +104,10 @@ public class ConsumeBoxEvents extends AbstractBoxProcessor
implements Verifiable
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
- private volatile BoxAPIConnection boxAPIConnection;
- private volatile EventStream eventStream;
- protected volatile BlockingQueue<BoxEvent> events;
- private volatile AtomicLong position = new AtomicLong(0);
+ private volatile BoxClient boxClient;
+ protected volatile BlockingQueue<Event> events;
+ private volatile AtomicReference<String> position = new
AtomicReference<>("0");
Review Comment:
This filed can be final.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java:
##########
@@ -16,86 +16,215 @@
*/
package org.apache.nifi.processors.box;
-import com.box.sdk.BoxEvent;
-import com.eclipsesource.json.Json;
-import com.eclipsesource.json.JsonObject;
+import com.box.sdkgen.schemas.event.Event;
+import com.box.sdkgen.schemas.eventsource.EventSource;
+import com.box.sdkgen.schemas.eventsourceresource.EventSourceResource;
+import com.box.sdkgen.schemas.file.File;
+import com.box.sdkgen.schemas.folder.Folder;
+import com.box.sdkgen.schemas.foldermini.FolderMini;
+import com.box.sdkgen.schemas.user.User;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.Objects;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Map;
/**
- * A class responsible for writing {@link BoxEvent} objects into a JSON array.
+ * A class responsible for writing {@link Event} objects into a JSON array.
* Not thread-safe.
*/
final class BoxEventJsonArrayWriter implements Closeable {
- private final Writer writer;
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final JsonGenerator generator;
private boolean hasBegun;
- private boolean hasEntries;
private boolean closed;
- private BoxEventJsonArrayWriter(final Writer writer) {
- this.writer = writer;
+ private BoxEventJsonArrayWriter(final JsonGenerator generator) {
+ this.generator = generator;
this.hasBegun = false;
- this.hasEntries = false;
this.closed = false;
}
static BoxEventJsonArrayWriter create(final OutputStream outputStream)
throws IOException {
- final Writer writer = new OutputStreamWriter(outputStream, UTF_8);
- return new BoxEventJsonArrayWriter(writer);
+ final JsonGenerator generator =
JSON_FACTORY.createGenerator(outputStream);
+ return new BoxEventJsonArrayWriter(generator);
}
- void write(final BoxEvent event) throws IOException {
+ void write(final Event event) throws IOException {
if (closed) {
throw new IOException("The Writer is closed");
}
if (!hasBegun) {
- beginArray();
+ generator.writeStartArray();
hasBegun = true;
}
- if (hasEntries) {
- writer.write(',');
+ writeEvent(event);
+ }
+
+ private void writeEvent(final Event event) throws IOException {
+ generator.writeStartObject();
+
+ // Map Event fields to JSON using camelCase to match the original NiFi
Box processor format
+ writeStringField("createdAt", event.getCreatedAt() != null ?
event.getCreatedAt().toString() : null);
+ writeStringField("recordedAt", event.getRecordedAt() != null ?
event.getRecordedAt().toString() : null);
+ writeStringField("eventType", event.getEventType() != null ?
event.getEventType().getValue() : null);
+ writeStringField("id", event.getEventId());
+ writeStringField("sessionID", event.getSessionId());
+ writeStringField("type", event.getType());
+
+ // Handle createdBy if present (camelCase for field name, but inner
fields match Box API)
+ if (event.getCreatedBy() != null) {
+ generator.writeObjectFieldStart("createdBy");
+ writeStringField("id", event.getCreatedBy().getId());
+ writeStringField("type", event.getCreatedBy().getType() != null ?
event.getCreatedBy().getType().getValue() : null);
+ writeStringField("name", event.getCreatedBy().getName());
+ writeStringField("login", event.getCreatedBy().getLogin());
+ generator.writeEndObject();
+ } else {
+ generator.writeNullField("createdBy");
}
- final JsonObject json = toRecord(event);
- json.writeTo(writer);
+ // Handle source if present - use snake_case for inner fields to match
Box API format
+ writeSource(event.getSource());
+
+ // Handle additionalDetails if present - serialize as proper JSON
object
+ writeAdditionalDetails(event.getAdditionalDetails());
- hasEntries = true;
+ generator.writeEndObject();
}
- private JsonObject toRecord(final BoxEvent event) {
- final JsonObject json = Json.object();
-
- json.add("accessibleBy", event.getAccessibleBy() == null ? Json.NULL :
Json.parse(event.getAccessibleBy().getJson()));
- json.add("actionBy", event.getActionBy() == null ? Json.NULL :
Json.parse(event.getActionBy().getJson()));
- json.add("additionalDetails",
Objects.requireNonNullElse(event.getAdditionalDetails(), Json.NULL));
- json.add("createdAt", event.getCreatedAt() == null ? Json.NULL :
Json.value(event.getCreatedAt().toString()));
- json.add("createdBy", event.getCreatedBy() == null ? Json.NULL :
Json.parse(event.getCreatedBy().getJson()));
- json.add("eventType", event.getEventType() == null ? Json.NULL :
Json.value(event.getEventType().name()));
- json.add("id", Objects.requireNonNullElse(Json.value(event.getID()),
Json.NULL));
- json.add("ipAddress",
Objects.requireNonNullElse(Json.value(event.getIPAddress()), Json.NULL));
- json.add("sessionID",
Objects.requireNonNullElse(Json.value(event.getSessionID()), Json.NULL));
- json.add("source", Objects.requireNonNullElse(event.getSourceJSON(),
Json.NULL));
- json.add("typeName",
Objects.requireNonNullElse(Json.value(event.getTypeName()), Json.NULL));
-
- return json;
+ private void writeSource(final EventSourceResource source) throws
IOException {
+ if (source == null) {
+ generator.writeNullField("source");
+ return;
+ }
+
+ generator.writeObjectFieldStart("source");
+ try {
+ // EventSourceResource is a union type (OneOfSix) - check what
kind of source we have
+ if (source.isFile()) {
+ // File source - contains file_id, file_name, and parent
folder info
+ File file = source.getFile();
+ writeStringField("item_type", "file");
+ writeStringField("item_id", file.getId());
+ writeStringField("item_name", file.getName());
+ // Add file-specific fields for collaboration events
+ writeStringField("file_id", file.getId());
+ writeStringField("file_name", file.getName());
+ // Add parent folder info if available
+ FolderMini parent = file.getParent();
+ if (parent != null) {
+ writeStringField("folder_id", parent.getId());
+ writeStringField("folder_name", parent.getName());
+ }
+ } else if (source.isFolder()) {
+ // Folder source - contains folder_id, folder_name
+ Folder folder = source.getFolder();
+ writeStringField("item_type", "folder");
+ writeStringField("item_id", folder.getId());
+ writeStringField("item_name", folder.getName());
+ // Add folder-specific fields for collaboration events
+ writeStringField("folder_id", folder.getId());
+ writeStringField("folder_name", folder.getName());
+ } else if (source.isEventSource()) {
+ // Generic EventSource - has item_type, item_id, item_name
+ EventSource eventSource = source.getEventSource();
+ String itemType = eventSource.getItemType() != null ?
eventSource.getItemType().getValue() : null;
+ writeStringField("item_type", itemType);
+ writeStringField("item_id", eventSource.getItemId());
+ writeStringField("item_name", eventSource.getItemName());
+ // For EventSource, also populate file/folder specific fields
based on item_type
+ if ("file".equals(itemType)) {
+ writeStringField("file_id", eventSource.getItemId());
+ writeStringField("file_name", eventSource.getItemName());
+ } else if ("folder".equals(itemType)) {
+ writeStringField("folder_id", eventSource.getItemId());
+ writeStringField("folder_name", eventSource.getItemName());
+ }
+ // Add parent folder info if available
+ FolderMini parent = eventSource.getParent();
+ if (parent != null) {
+ writeStringField("parent_id", parent.getId());
+ writeStringField("parent_name", parent.getName());
+ }
+ } else if (source.isUser()) {
+ // User source
+ User user = source.getUser();
+ writeStringField("item_type", "user");
+ writeStringField("id", user.getId());
+ writeStringField("name", user.getName());
+ writeStringField("login", user.getLogin());
+ } else if (source.isMap()) {
+ // Generic map - write all entries
+ Map<String, Object> map = source.getMap();
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ Object value = entry.getValue();
+ if (value != null) {
+ generator.writeFieldName(entry.getKey());
+ generator.writeObject(value);
+ }
+ }
+ } else if (source.isAppItemEventSource()) {
+ // AppItemEventSource
+ writeStringField("item_type", "app_item");
+ } else {
+ writeStringField("item_type", "unknown");
+ }
+ } catch (Exception e) {
+ writeStringField("error", "Could not serialize source: " +
e.getMessage());
+ }
+ generator.writeEndObject();
}
- private void beginArray() throws IOException {
- writer.write('[');
+ private void writeAdditionalDetails(final Map<String, Object>
additionalDetails) throws IOException {
+ if (additionalDetails == null) {
+ generator.writeNullField("additionalDetails");
+ return;
+ }
+
+ try {
+ // Write additionalDetails as a proper JSON object, not a string
+ generator.writeFieldName("additionalDetails");
+ generator.writeStartObject();
+ for (Map.Entry<String, Object> entry :
additionalDetails.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value == null) {
+ generator.writeNullField(key);
+ } else if (value instanceof String) {
Review Comment:
Nitpicking. We can use [pattern matching in
instanceof](https://docs.oracle.com/en/java/javase/17/language/pattern-matching-instanceof.html).
```java
if (value instanceof String strValue) {
generator.writeStringField(key, strValue);
}
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/UpdateBoxFileMetadataInstance.java:
##########
@@ -236,106 +242,116 @@ private Map<String, Object> readDesiredState(final
ProcessSession session,
return desiredState;
}
- private void updateMetadata(final Metadata metadata,
- final Map<String, Object> desiredState) {
- final List<String> currentKeys = metadata.getPropertyPaths();
+ private List<UpdateFileMetadataByIdRequestBody>
buildUpdateOperations(final MetadataFull currentMetadata,
+
final Map<String, Object> desiredState) {
+ final List<UpdateFileMetadataByIdRequestBody> operations = new
ArrayList<>();
- // Remove fields not in desired state
- for (final String propertyPath : currentKeys) {
- final String fieldName = propertyPath.substring(1); // Remove
leading '/'
+ // Get current field names from extra data
+ final Set<String> currentKeys = new HashSet<>();
+ final Map<String, Object> extraData = currentMetadata.getExtraData();
+ if (extraData != null) {
+ currentKeys.addAll(extraData.keySet());
+ }
+ // Remove fields not in desired state
+ for (final String fieldName : currentKeys) {
if (!desiredState.containsKey(fieldName)) {
- metadata.remove(propertyPath);
+ final String path = "/" + fieldName;
getLogger().debug("Removing metadata field: {}", fieldName);
+ operations.add(new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REMOVE)
+ .path(path)
+ .build());
}
}
// Add or update fields
for (final Map.Entry<String, Object> entry : desiredState.entrySet()) {
final String fieldName = entry.getKey();
final Object value = entry.getValue();
- final String propertyPath = "/" + fieldName;
+ final String path = "/" + fieldName;
+ final boolean exists = currentKeys.contains(fieldName);
- updateField(metadata, propertyPath, value,
currentKeys.contains(propertyPath));
+ final UpdateFileMetadataByIdRequestBody operation =
buildFieldOperation(path, value, exists, extraData);
+ if (operation != null) {
+ operations.add(operation);
+ }
}
+
+ return operations;
}
- private void updateField(final Metadata metadata,
- final String propertyPath,
- final Object value,
- final boolean exists) {
+ private UpdateFileMetadataByIdRequestBody buildFieldOperation(final String
path,
+ final Object
value,
+ final
boolean exists,
+ final
Map<String, Object> extraData) {
if (value == null) {
- throw new IllegalArgumentException("Null value found for property
path: " + propertyPath);
+ throw new IllegalArgumentException("Null value found for property
path: " + path);
}
- if (exists) {
- final Object currentValue = metadata.getValue(propertyPath);
-
- // Only update if values are different
+ // If exists, check if values are different
+ if (exists && extraData != null) {
+ final String fieldName = path.substring(1);
+ final Object currentValue = extraData.get(fieldName);
if (Objects.equals(currentValue, value)) {
- return;
+ return null; // No change needed
}
+ }
+
+ final MetadataInstanceValue metadataValue =
convertToMetadataInstanceValue(value, path);
+
+ // Box API uses replace for both adding new fields and updating
existing fields
+ return new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REPLACE)
+ .path(path)
+ .value(metadataValue)
+ .build();
+ }
- // Update
- switch (value) {
- case Number n -> metadata.replace(propertyPath,
n.doubleValue());
- case List<?> l -> metadata.replace(propertyPath,
convertListToStringList(l, propertyPath));
- case LocalDate d -> metadata.replace(propertyPath,
BoxDate.of(d).format());
- default -> metadata.replace(propertyPath, value.toString());
+ private MetadataInstanceValue convertToMetadataInstanceValue(final Object
value, final String path) {
+ if (value instanceof Number n) {
+ if (value instanceof Double || value instanceof Float) {
+ return new MetadataInstanceValue(n.doubleValue());
+ } else {
+ return new MetadataInstanceValue(n.longValue());
}
+ } else if (value instanceof List<?> l) {
+ final List<String> stringList = l.stream()
+ .map(obj -> {
+ if (obj == null) {
+ throw new IllegalArgumentException("Null value
found in list for field: " + path);
+ }
+ return obj.toString();
+ })
+ .collect(Collectors.toList());
+ return new MetadataInstanceValue(stringList);
+ } else if (value instanceof LocalDate d) {
+ return new MetadataInstanceValue(BoxDate.of(d).format());
} else {
- // Add new field
- switch (value) {
- case Number n -> metadata.add(propertyPath, n.doubleValue());
- case List<?> l -> metadata.add(propertyPath,
convertListToStringList(l, propertyPath));
- case LocalDate d -> metadata.add(propertyPath,
BoxDate.of(d).format());
- default -> metadata.add(propertyPath, value.toString());
- }
+ return new MetadataInstanceValue(value.toString());
}
}
- private List<String> convertListToStringList(final List<?> list,
- final String fieldName) {
- return list.stream()
- .map(obj -> {
- if (obj == null) {
- throw new IllegalArgumentException("Null value found
in list for field: " + fieldName);
- }
- return obj.toString();
- })
- .collect(Collectors.toList());
- }
-
/**
* Retrieves the metadata for a Box file.
* Visible for testing purposes.
*
- * @param boxFile The Box file to retrieve metadata from.
+ * @param fileId The ID of the file.
* @param templateKey The key of the metadata template.
* @return The metadata for the Box file.
*/
- Metadata getMetadata(final BoxFile boxFile,
- final String templateKey) {
- return boxFile.getMetadata(templateKey);
- }
-
- /**
- * Returns a BoxFile object for the given file ID.
- *
- * @param fileId The ID of the file.
- * @return A BoxFile object for the given file ID.
- */
- BoxFile getBoxFile(final String fileId) {
- return new BoxFile(boxAPIConnection, fileId);
+ MetadataFull getMetadata(final String fileId, final String templateKey) {
+ return boxClient.getFileMetadata().getFileMetadataById(fileId,
GetFileMetadataByIdScope.ENTERPRISE, templateKey);
Review Comment:
We shouldn't always use ENTERPRISE.
The older SDK had the logic where it determined a scope based on the
template key. It seems we have to do the same in the processor now.
From old SDK Metadata class:
```java
public static final String DEFAULT_METADATA_TYPE = "properties";
static String scopeBasedOnType(String typeName) {
String scope;
if (typeName.equals(DEFAULT_METADATA_TYPE)) {
scope = GLOBAL_METADATA_SCOPE;
} else {
scope = ENTERPRISE_METADATA_SCOPE;
}
return scope;
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/CreateBoxFileMetadataInstance.java:
##########
@@ -281,12 +280,13 @@ private boolean isArray(final RecordFieldType fieldType) {
}
/**
- * Returns a BoxFile object for the given file ID.
+ * Creates metadata for a file.
*
- * @param fileId The ID of the file.
- * @return A BoxFile object for the given file ID.
+ * @param fileId The ID of the file.
+ * @param templateKey The template key of the metadata.
+ * @param metadataValues The metadata key-value pairs.
*/
- BoxFile getBoxFile(final String fileId) {
- return new BoxFile(boxAPIConnection, fileId);
+ void createFileMetadata(final String fileId, final String templateKey,
final Map<String, Object> metadataValues) {
+ boxClient.getFileMetadata().createFileMetadataById(fileId,
CreateFileMetadataByIdScope.ENTERPRISE, templateKey, metadataValues);
Review Comment:
We shouldn't always use `ENTERPRISE`.
The older SDK had the logic where it determined a scope based on the
template key. It seems we have to do the same in the processor now.
From old SDK `Metadata` class:
```java
public static final String DEFAULT_METADATA_TYPE = "properties";
static String scopeBasedOnType(String typeName) {
String scope;
if (typeName.equals(DEFAULT_METADATA_TYPE)) {
scope = GLOBAL_METADATA_SCOPE;
} else {
scope = ENTERPRISE_METADATA_SCOPE;
}
return scope;
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-services/src/main/java/org/apache/nifi/box/controllerservices/JsonConfigBasedBoxClientService.java:
##########
@@ -257,5 +233,8 @@ public void migrateProperties(PropertyConfiguration config)
{
config.renameProperty("app-config-file", APP_CONFIG_FILE.getName());
config.renameProperty("app-config-json", APP_CONFIG_JSON.getName());
ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+ // Remove timeout properties that are no longer supported in Box SDK
10.x
+ config.removeProperty("Connect Timeout");
+ config.removeProperty("Read Timeout");
Review Comment:
Another option to consider, although a more radical, is to use NiFi
web-client instead of okhttp.
`NetworkClient` provides a concise interface which we can use as an adapter
to a web-client.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -135,47 +138,71 @@ public void onScheduled(final ProcessContext context) {
events = new LinkedBlockingQueue<>(queueCapacity);
} else {
// create new one with events from the old queue in case capacity
has changed
- final BlockingQueue<BoxEvent> newQueue = new
LinkedBlockingQueue<>(queueCapacity);
+ final BlockingQueue<Event> newQueue = new
LinkedBlockingQueue<>(queueCapacity);
newQueue.addAll(events);
events = newQueue;
}
- eventStream.addListener(new EventListener() {
+ // Start polling for events in a background thread
+ pollingExecutor = Executors.newSingleThreadScheduledExecutor();
+ pollingExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ pollEvents(context);
Review Comment:
If I understand correctly, now we aren't pushed with events and we poll them
instead.
In that case it seems we can remove the buffering entirely, and perform
polling straight in `onTrigger`. When done this way, we will lose prefetching,
but it seems to be the pattern in other processors where polling is supported.
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/UpdateBoxFileMetadataInstance.java:
##########
@@ -236,106 +242,116 @@ private Map<String, Object> readDesiredState(final
ProcessSession session,
return desiredState;
}
- private void updateMetadata(final Metadata metadata,
- final Map<String, Object> desiredState) {
- final List<String> currentKeys = metadata.getPropertyPaths();
+ private List<UpdateFileMetadataByIdRequestBody>
buildUpdateOperations(final MetadataFull currentMetadata,
+
final Map<String, Object> desiredState) {
+ final List<UpdateFileMetadataByIdRequestBody> operations = new
ArrayList<>();
- // Remove fields not in desired state
- for (final String propertyPath : currentKeys) {
- final String fieldName = propertyPath.substring(1); // Remove
leading '/'
+ // Get current field names from extra data
+ final Set<String> currentKeys = new HashSet<>();
+ final Map<String, Object> extraData = currentMetadata.getExtraData();
+ if (extraData != null) {
+ currentKeys.addAll(extraData.keySet());
+ }
+ // Remove fields not in desired state
+ for (final String fieldName : currentKeys) {
if (!desiredState.containsKey(fieldName)) {
- metadata.remove(propertyPath);
+ final String path = "/" + fieldName;
getLogger().debug("Removing metadata field: {}", fieldName);
+ operations.add(new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REMOVE)
+ .path(path)
+ .build());
}
}
// Add or update fields
for (final Map.Entry<String, Object> entry : desiredState.entrySet()) {
final String fieldName = entry.getKey();
final Object value = entry.getValue();
- final String propertyPath = "/" + fieldName;
+ final String path = "/" + fieldName;
+ final boolean exists = currentKeys.contains(fieldName);
- updateField(metadata, propertyPath, value,
currentKeys.contains(propertyPath));
+ final UpdateFileMetadataByIdRequestBody operation =
buildFieldOperation(path, value, exists, extraData);
+ if (operation != null) {
+ operations.add(operation);
+ }
}
+
+ return operations;
}
- private void updateField(final Metadata metadata,
- final String propertyPath,
- final Object value,
- final boolean exists) {
+ private UpdateFileMetadataByIdRequestBody buildFieldOperation(final String
path,
+ final Object
value,
+ final
boolean exists,
+ final
Map<String, Object> extraData) {
if (value == null) {
- throw new IllegalArgumentException("Null value found for property
path: " + propertyPath);
+ throw new IllegalArgumentException("Null value found for property
path: " + path);
}
- if (exists) {
- final Object currentValue = metadata.getValue(propertyPath);
-
- // Only update if values are different
+ // If exists, check if values are different
+ if (exists && extraData != null) {
+ final String fieldName = path.substring(1);
+ final Object currentValue = extraData.get(fieldName);
if (Objects.equals(currentValue, value)) {
- return;
+ return null; // No change needed
}
+ }
+
+ final MetadataInstanceValue metadataValue =
convertToMetadataInstanceValue(value, path);
+
+ // Box API uses replace for both adding new fields and updating
existing fields
+ return new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REPLACE)
+ .path(path)
+ .value(metadataValue)
+ .build();
+ }
- // Update
- switch (value) {
- case Number n -> metadata.replace(propertyPath,
n.doubleValue());
- case List<?> l -> metadata.replace(propertyPath,
convertListToStringList(l, propertyPath));
- case LocalDate d -> metadata.replace(propertyPath,
BoxDate.of(d).format());
- default -> metadata.replace(propertyPath, value.toString());
+ private MetadataInstanceValue convertToMetadataInstanceValue(final Object
value, final String path) {
Review Comment:
Nitpicking. What do you think about using pattern matching with switch?
```java
private MetadataInstanceValue convertToMetadataInstanceValue(final
Object value, final String path) {
return switch (value) {
case Float f -> new MetadataInstanceValue(f.doubleValue());
case Double d -> new MetadataInstanceValue(d);
case Number n -> new MetadataInstanceValue(n.longValue());
case List<?> l -> {
final List<String> stringList = l.stream()
.map(obj -> {
if (obj == null) {
throw new IllegalArgumentException("Null
value found in list for field: " + path);
}
return obj.toString();
})
.toList();
yield new MetadataInstanceValue(stringList);
}
case LocalDate d -> new
MetadataInstanceValue(BoxDate.of(d).format());
default -> new MetadataInstanceValue(value.toString());
};
}
```
##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/UpdateBoxFileMetadataInstance.java:
##########
@@ -236,106 +242,116 @@ private Map<String, Object> readDesiredState(final
ProcessSession session,
return desiredState;
}
- private void updateMetadata(final Metadata metadata,
- final Map<String, Object> desiredState) {
- final List<String> currentKeys = metadata.getPropertyPaths();
+ private List<UpdateFileMetadataByIdRequestBody>
buildUpdateOperations(final MetadataFull currentMetadata,
+
final Map<String, Object> desiredState) {
+ final List<UpdateFileMetadataByIdRequestBody> operations = new
ArrayList<>();
- // Remove fields not in desired state
- for (final String propertyPath : currentKeys) {
- final String fieldName = propertyPath.substring(1); // Remove
leading '/'
+ // Get current field names from extra data
+ final Set<String> currentKeys = new HashSet<>();
+ final Map<String, Object> extraData = currentMetadata.getExtraData();
+ if (extraData != null) {
+ currentKeys.addAll(extraData.keySet());
+ }
+ // Remove fields not in desired state
+ for (final String fieldName : currentKeys) {
if (!desiredState.containsKey(fieldName)) {
- metadata.remove(propertyPath);
+ final String path = "/" + fieldName;
getLogger().debug("Removing metadata field: {}", fieldName);
+ operations.add(new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REMOVE)
+ .path(path)
+ .build());
}
}
// Add or update fields
for (final Map.Entry<String, Object> entry : desiredState.entrySet()) {
final String fieldName = entry.getKey();
final Object value = entry.getValue();
- final String propertyPath = "/" + fieldName;
+ final String path = "/" + fieldName;
+ final boolean exists = currentKeys.contains(fieldName);
- updateField(metadata, propertyPath, value,
currentKeys.contains(propertyPath));
+ final UpdateFileMetadataByIdRequestBody operation =
buildFieldOperation(path, value, exists, extraData);
+ if (operation != null) {
+ operations.add(operation);
+ }
}
+
+ return operations;
}
- private void updateField(final Metadata metadata,
- final String propertyPath,
- final Object value,
- final boolean exists) {
+ private UpdateFileMetadataByIdRequestBody buildFieldOperation(final String
path,
+ final Object
value,
+ final
boolean exists,
+ final
Map<String, Object> extraData) {
if (value == null) {
- throw new IllegalArgumentException("Null value found for property
path: " + propertyPath);
+ throw new IllegalArgumentException("Null value found for property
path: " + path);
}
- if (exists) {
- final Object currentValue = metadata.getValue(propertyPath);
-
- // Only update if values are different
+ // If exists, check if values are different
+ if (exists && extraData != null) {
+ final String fieldName = path.substring(1);
+ final Object currentValue = extraData.get(fieldName);
if (Objects.equals(currentValue, value)) {
- return;
+ return null; // No change needed
}
+ }
+
+ final MetadataInstanceValue metadataValue =
convertToMetadataInstanceValue(value, path);
+
+ // Box API uses replace for both adding new fields and updating
existing fields
+ return new UpdateFileMetadataByIdRequestBody.Builder()
+ .op(UpdateFileMetadataByIdRequestBodyOpField.REPLACE)
+ .path(path)
+ .value(metadataValue)
+ .build();
+ }
- // Update
- switch (value) {
- case Number n -> metadata.replace(propertyPath,
n.doubleValue());
- case List<?> l -> metadata.replace(propertyPath,
convertListToStringList(l, propertyPath));
- case LocalDate d -> metadata.replace(propertyPath,
BoxDate.of(d).format());
- default -> metadata.replace(propertyPath, value.toString());
+ private MetadataInstanceValue convertToMetadataInstanceValue(final Object
value, final String path) {
+ if (value instanceof Number n) {
+ if (value instanceof Double || value instanceof Float) {
+ return new MetadataInstanceValue(n.doubleValue());
+ } else {
+ return new MetadataInstanceValue(n.longValue());
}
+ } else if (value instanceof List<?> l) {
+ final List<String> stringList = l.stream()
+ .map(obj -> {
+ if (obj == null) {
+ throw new IllegalArgumentException("Null value
found in list for field: " + path);
+ }
+ return obj.toString();
+ })
+ .collect(Collectors.toList());
+ return new MetadataInstanceValue(stringList);
+ } else if (value instanceof LocalDate d) {
+ return new MetadataInstanceValue(BoxDate.of(d).format());
} else {
- // Add new field
- switch (value) {
- case Number n -> metadata.add(propertyPath, n.doubleValue());
- case List<?> l -> metadata.add(propertyPath,
convertListToStringList(l, propertyPath));
- case LocalDate d -> metadata.add(propertyPath,
BoxDate.of(d).format());
- default -> metadata.add(propertyPath, value.toString());
- }
+ return new MetadataInstanceValue(value.toString());
}
}
- private List<String> convertListToStringList(final List<?> list,
- final String fieldName) {
- return list.stream()
- .map(obj -> {
- if (obj == null) {
- throw new IllegalArgumentException("Null value found
in list for field: " + fieldName);
- }
- return obj.toString();
- })
- .collect(Collectors.toList());
- }
-
/**
* Retrieves the metadata for a Box file.
* Visible for testing purposes.
*
- * @param boxFile The Box file to retrieve metadata from.
+ * @param fileId The ID of the file.
* @param templateKey The key of the metadata template.
* @return The metadata for the Box file.
*/
- Metadata getMetadata(final BoxFile boxFile,
- final String templateKey) {
- return boxFile.getMetadata(templateKey);
- }
-
- /**
- * Returns a BoxFile object for the given file ID.
- *
- * @param fileId The ID of the file.
- * @return A BoxFile object for the given file ID.
- */
- BoxFile getBoxFile(final String fileId) {
- return new BoxFile(boxAPIConnection, fileId);
+ MetadataFull getMetadata(final String fileId, final String templateKey) {
+ return boxClient.getFileMetadata().getFileMetadataById(fileId,
GetFileMetadataByIdScope.ENTERPRISE, templateKey);
}
/**
* Updates the metadata for a Box file.
*
- * @param boxFile The Box file to update.
- * @param metadata The metadata to update.
+ * @param fileId The ID of the file.
+ * @param templateKey The key of the metadata template.
+ * @param operations The list of update operations.
*/
- void updateBoxFileMetadata(final BoxFile boxFile, final Metadata metadata)
{
- boxFile.updateMetadata(metadata);
+ void updateBoxFileMetadata(final String fileId, final String templateKey,
final List<UpdateFileMetadataByIdRequestBody> operations) {
+ boxClient.getFileMetadata().updateFileMetadataById(fileId,
UpdateFileMetadataByIdScope.ENTERPRISE, templateKey, operations);
Review Comment:
Same here, shouldn't be enterprise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]