exceptionfactory commented on code in PR #8152:
URL: https://github.com/apache/nifi/pull/8152#discussion_r1476061099


##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/BufferedStatusHistoryStorage.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+final class BufferedStatusHistoryStorage implements StatusHistoryStorage {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BufferedStatusHistoryStorage.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(1, new 
BasicThreadFactory.Builder().namingPattern("BufferedStatusHistoryStorage-" + id 
+ "-%d").build());
+
+    private final StatusHistoryStorage storage;
+    private final long persistFrequencyInMs;
+    private final int persistBatchSize;
+
+    private final BlockingQueue<CapturedStatus<NodeStatus>> nodeStatusQueue = 
new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<GarbageCollectionStatus>> 
garbageCollectionStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ProcessGroupStatus>> 
processGroupStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ConnectionStatus>> 
connectionStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> 
remoteProcessGroupStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ProcessorStatus>> 
processorStatusQueue = new LinkedBlockingQueue<>();
+
+    public BufferedStatusHistoryStorage(final StatusHistoryStorage storage, 
final long persistFrequencyInMs, final int persistBatchSize) {
+        this.storage = storage;
+        this.persistFrequencyInMs = persistFrequencyInMs;
+        this.persistBatchSize = persistBatchSize;
+    }
+
+    @Override
+    public void init() {
+        storage.init();
+        final ScheduledFuture<?> future = 
scheduledExecutorService.scheduleWithFixedDelay(
+                new BufferedStatusHistoryStorageWorker(), 
persistFrequencyInMs, persistFrequencyInMs, TimeUnit.MILLISECONDS);
+        scheduledFutures.add(future);
+        LOGGER.info("Flushing is initiated");
+    }
+
+    @Override
+    public void close() {
+        storage.close();
+        LOGGER.debug("Flushing shutdown started");
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+
+        LOGGER.debug("Flushing shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug(" Scheduled Task Service shutdown remaining tasks [{}]", 
tasks.size());

Review Comment:
   ```suggestion
           LOGGER.debug("Scheduled Task Service shutdown remaining tasks [{}]", 
tasks.size());
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/StorateStatusDataSource.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.StorageStatus;
+import org.apache.nifi.questdb.InsertRowContext;
+import org.apache.nifi.questdb.InsertRowDataSource;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+final class StorateStatusDataSource implements InsertRowDataSource {

Review Comment:
   ```suggestion
   final class StorageStatusDataSource implements InsertRowDataSource {
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered. Manager tries to move corrupted database files to the backup 
location: {}", context.getBackupLocation(), e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsPath().toFile(), "backup_" + 
System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsPath().toFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsPath().toFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
directory creation failed [%s]", context.getPersistLocationAsPath()), e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsPath().toFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
connection failed [%s]", absolutePath), e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsPath().toFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsPath().toFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.debug("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.debug("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new 
CorruptedDatabaseException(String.format("Creating table [%s] has failed", 
tableDefinition.getName()), e);
+                    }
+                } else if 
(!databaseFiles.get(tableDefinition.getName()).isDirectory()) {
+                    throw new CorruptedDatabaseException(String.format("Table 
%s cannot be created because there is already a file exists with the given 
name", tableDefinition.getName()));
+                }
+            }
+
+            // Checking if tables are healthy.
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                try {
+                    final TableToken tableToken = 
this.engine.get().getTableTokenIfExists(tableDefinition.getName());
+                    final TableRecordMetadata metadata = 
this.engine.get().getSequencerMetadata(tableToken);
+                    metadata.close();
+
+                    client.execute(String.format("SELECT * FROM %S LIMIT 1", 
tableDefinition.getName()));
+                } catch (final Exception e) {
+                    throw new CorruptedDatabaseException(e);
+                }
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (DatabaseException e) {
+                throw new CorruptedDatabaseException(e);
+            }
+        }
+    }
+
+    private void startRollover() {
+        final RolloverWorker rolloverWorker = new 
RolloverWorker(acquireClient(), context.getTableDefinitions());
+        final ScheduledFuture<?> rolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(
+                rolloverWorker, context.getRolloverFrequency().toMillis(), 
context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS);
+        scheduledFutures.add(rolloverFuture);
+        LOGGER.debug("Rollover started");
+    }
+
+    private void stopRollover() {
+        LOGGER.debug("Rollover shutdown started");
+
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+
+        LOGGER.debug("Rollover shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks 
[{}]", tasks.size());
+    }
+
+    private Client getUnmanagedClient() {
+        return new EmbeddedClient(() -> engine.get());
+    }
+
+    public Client acquireClient() {
+        checkIfManagerIsInitialised();
+        final Client fallback = new NoOpClient();
+
+        if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) {
+            LOGGER.error("The database is corrupted: Status History will not 
be stored");

Review Comment:
   On further review of this behavior, falling back to the `NoOpClient` seems 
problematic as it effectively disables all Status History capabilities.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/test/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManagerTest.java:
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.apache.nifi.questdb.rollover.RolloverStrategy;
+import org.apache.nifi.questdb.util.Event;
+import org.apache.nifi.questdb.util.QuestDbTestUtil;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.StreamSupport;
+
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.CREATE_EVENT2_TABLE;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.CREATE_EVENT_TABLE;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.EVENT2_TABLE_NAME;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.EVENT_TABLE_NAME;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.SELECT_QUERY;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.getRandomTestData;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.getTestData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EmbeddedDatabaseManagerTest extends EmbeddedQuestDbTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManagerTest.class);
+    private static final int DAYS_TO_KEEP_EVENT = 1;
+    private static final String NON_EXISTING_PLACE = SystemUtils.IS_OS_WINDOWS 
? "T:/nonExistingPlace" : "/nonExistingPlace";

Review Comment:
   Is this still necessary with the methods disabled on Windows?



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/test/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManagerTest.java:
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.apache.nifi.questdb.rollover.RolloverStrategy;
+import org.apache.nifi.questdb.util.Event;
+import org.apache.nifi.questdb.util.QuestDbTestUtil;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.StreamSupport;
+
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.CREATE_EVENT2_TABLE;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.CREATE_EVENT_TABLE;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.EVENT2_TABLE_NAME;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.EVENT_TABLE_NAME;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.SELECT_QUERY;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.getRandomTestData;
+import static org.apache.nifi.questdb.util.QuestDbTestUtil.getTestData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EmbeddedDatabaseManagerTest extends EmbeddedQuestDbTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManagerTest.class);
+    private static final int DAYS_TO_KEEP_EVENT = 1;
+    private static final String NON_EXISTING_PLACE = SystemUtils.IS_OS_WINDOWS 
? "T:/nonExistingPlace" : "/nonExistingPlace";
+
+    @Test
+    public void testAcquiringWithoutInitialization() {
+        final EmbeddedDatabaseManager testSubject = new 
EmbeddedDatabaseManager(new SimpleEmbeddedDatabaseManagerContext());
+        assertThrows(IllegalStateException.class, () -> 
testSubject.acquireClient());
+    }
+
+    @Test
+    public void testHappyPath() throws DatabaseException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject = getTestSubject();
+        assertDatabaseFolderIsNotEmpty();
+
+        final Client client = testSubject.acquireClient();
+
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+        final List<Event> result = client.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+
+        assertIterableEquals(testData, result);
+
+        testSubject.close();
+
+        // Even if the client itself is not connected, manager prevents client 
to reach database after closing
+        assertFalse(client.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING)).iterator().hasNext());
+    }
+
+    @Test
+    public void testRollover() throws DatabaseException, InterruptedException {
+        final List<Event> testData = new ArrayList<>();
+        testData.add(new Event(Instant.now().minus((DAYS_TO_KEEP_EVENT + 1), 
ChronoUnit.DAYS), "A", 1));
+        testData.add(new Event(Instant.now(), "B", 2));
+        final DatabaseManager testSubject = getTestSubject();
+
+        final Client client = testSubject.acquireClient();
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+
+        // The rollover runs in every 5 seconds
+        Thread.sleep(TimeUnit.SECONDS.toMillis(6));
+
+        final List<Event> result = client.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+        testSubject.close();
+
+        assertEquals(1, result.size());
+    }
+
+    @Test
+    public void testParallelClientsOnSameThread() throws DatabaseException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject = getTestSubject();
+
+        final Client client1 = testSubject.acquireClient();
+        client1.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+        final List<Event> result1 = client1.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+
+        final Client client2 = testSubject.acquireClient();
+        final List<Event> result2 = client2.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+
+        testSubject.close();
+        assertEquals(3, result2.size());
+        assertIterableEquals(result1, result2);
+    }
+
+    @Test
+    public void testParallelClientsDifferentThread() throws 
InterruptedException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject = getTestSubject();
+        final CountDownLatch step1 = new CountDownLatch(1);
+        final CountDownLatch step2 = new CountDownLatch(1);
+        final AtomicReference<List<Event>> result1 = new AtomicReference<>();
+        final AtomicReference<List<Event>> result2 = new AtomicReference<>();
+
+        new Thread(() -> {
+            try {
+                final Client client1 = testSubject.acquireClient();
+                client1.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+                result1.set(client1.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING)));
+                step1.countDown();
+            } catch (final DatabaseException e) {
+                throw new RuntimeException(e);
+            }
+        }).start();
+
+        new Thread(() -> {
+            try {
+                step1.await();
+                final Client client2 = testSubject.acquireClient();
+                result2.set(client2.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING)));
+                step2.countDown();
+            } catch (final DatabaseException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }).start();
+
+        step2.await();
+
+        testSubject.close();
+        assertEquals(3, result1.get().size());
+        assertIterableEquals(result1.get(), result2.get());
+    }
+
+    @Test
+    public void testContactingToDatabaseWithDifferentManager() throws 
DatabaseException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject1 = getTestSubject();
+
+        final Client client1 = testSubject1.acquireClient();
+        client1.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+        final List<Event> result1 = client1.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+        client1.disconnect();
+        testSubject1.close();
+
+        assertDatabaseFolderIsNotEmpty();
+
+        final DatabaseManager testSubject2 = getTestSubject();
+        final Client client2 = testSubject2.acquireClient();
+        final List<Event> result2 = client2.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+        client2.disconnect();
+        testSubject2.close();
+
+        assertIterableEquals(result1, result2);
+    }
+
+    @Test
+    public void testDatabaseRestorationAfterLostDatabase() throws 
DatabaseException, IOException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject = getTestSubject();
+        final Client client = testSubject.acquireClient();
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+
+        FileUtils.deleteFilesInDir(testDbPathDirectory.toFile(), (dir, name) 
-> true, LOGGER, true, true);
+
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+        final List<Event> result = client.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+
+        testSubject.close();
+
+        // Ensuring that not the fallback client answers
+        assertEquals(3, result.size());
+    }
+
+    @Test
+    public void testDatabaseRestorationAfterLosingTableFiles() throws 
DatabaseException, IOException {
+        final List<Event> testData = getTestData();
+        final DatabaseManager testSubject = getTestSubject();
+        final Client client = testSubject.acquireClient();
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+
+        final File eventTableDirectory = new 
File(testDbPathDirectory.toFile(), "event");
+        FileUtils.deleteFilesInDir(eventTableDirectory, (dir, name) -> true, 
LOGGER, true, true);
+        FileUtils.deleteFile(eventTableDirectory, LOGGER);
+
+        client.insert(EVENT_TABLE_NAME, 
QuestDbTestUtil.getEventTableDataSource(testData));
+        final List<Event> result = client.query(SELECT_QUERY, 
RequestMapping.getResultProcessor(QuestDbTestUtil.EVENT_TABLE_REQUEST_MAPPING));
+
+        testDbPathDirectory.toFile().list((dir, name) -> dir.isDirectory());
+
+        testSubject.close();
+
+        // Ensuring that not the fallback client answers
+        assertEquals(3, result.size());
+    }
+
+    @Test
+    @DisabledOnOs(OS.WINDOWS)
+    // This testcase cannot be reproduced under Windows using Junit

Review Comment:
   This comment can be moved to the `disabledReason` attribute of 
`DisabledOnOs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to