exceptionfactory commented on code in PR #8152:
URL: https://github.com/apache/nifi/pull/8152#discussion_r1458179457


##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/pom.xml:
##########
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-questdb-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-questdb</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.questdb</groupId>
+            <artifactId>questdb</artifactId>
+            <version>7.3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+            <version>6.1.2</version>

Review Comment:
   This version can be removed because it is managed at the root pom.xml. Is 
this dependency required?



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/rollover/DeleteOldRolloverStrategy.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.rollover;
+
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.apache.nifi.questdb.QueryRowContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Supplier;
+
+final class DeleteOldRolloverStrategy implements RolloverStrategy {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DeleteOldRolloverStrategy.class);
+    private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);

Review Comment:
   Is there a reason for using `UTC` in this case? It seems like it would be 
more intuitive to use `LocalDateTime`, matching the system time.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedClient.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.CairoError;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.TableWriter;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.CompiledQuery;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlCompilerFactoryImpl;
+import io.questdb.griffin.SqlException;
+import io.questdb.griffin.SqlExecutionContext;
+import io.questdb.mp.SCSequence;
+import io.questdb.mp.TimeoutBlockingWaitStrategy;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+final class EmbeddedClient implements Client {
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedClient.class);
+
+    private final Supplier<CairoEngine> engine;
+    private final AtomicBoolean disconnected = new AtomicBoolean(false);
+
+    EmbeddedClient(final Supplier<CairoEngine> engine) {
+        this.engine = engine;
+    }
+
+    @Override
+    public void execute(final String query) throws DatabaseException {
+        checkConnectionState();
+
+        try (final SqlCompiler compiler = getCompiler()) {
+            final CompiledQuery compile = compiler.compile(query, 
getSqlExecutionContext());
+            compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, 
TimeUnit.SECONDS)));
+        } catch (final SqlException | CairoError e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @Override
+    public void insert(
+        final String tableName,
+        final InsertRowDataSource rowDataSource
+    ) throws DatabaseException {
+        checkConnectionState();
+
+        if (!rowDataSource.hasNextToInsert()) {
+            LOGGER.debug("No rows to insert into {}", tableName);
+            return;
+        }
+
+        final TableToken tableToken = 
engine.get().getTableTokenIfExists(tableName);
+
+        if (tableToken == null) {
+            throw new DatabaseException(String.format("Table Token for table 
[%s] not found", tableName));
+        }
+
+        try (
+            final TableWriter tableWriter = engine.get().getWriter(tableToken, 
"adding rows")
+        ) {
+            final TableWriterBasedInsertRowContext context = new 
TableWriterBasedInsertRowContext(tableWriter);
+
+            while (rowDataSource.hasNextToInsert()) {
+                context.addRow(rowDataSource);
+            }
+
+            LOGGER.debug("Committing {} rows", tableWriter.getRowCount());
+            tableWriter.commit();
+        } catch (final Exception | CairoError e) {
+            // CairoError might be thrown in extreme cases, for example when 
no space left on the disk
+            throw new DatabaseException(e);
+        } finally {
+            engine.get().releaseInactive();
+        }
+    }
+
+    @Override
+    public <T> T query(final String query, final QueryResultProcessor<T> 
rowProcessor) throws DatabaseException {
+        checkConnectionState();
+
+        final CompiledQuery compiledQuery;
+
+        try (final SqlCompiler compiler = getCompiler()) {
+            compiledQuery = compiler.compile(query, getSqlExecutionContext());
+        } catch (final SqlException | CairoError e) {
+            throw new DatabaseException(e);
+        }
+
+        try (
+            final RecordCursorFactory factory = 
compiledQuery.getRecordCursorFactory();
+            final RecordCursor cursor = 
factory.getCursor(getSqlExecutionContext());
+        ) {
+            final CursorBasedQueryRowContext rowContext = new 
CursorBasedQueryRowContext(cursor);
+
+            while ((rowContext.hasNext())) {
+                rowContext.moveToNext();
+                rowProcessor.processRow(rowContext);
+            }
+
+            return rowProcessor.getResult();
+        } catch (final Exception e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @Override
+    public void disconnect() throws DatabaseException {
+        checkConnectionState();
+        disconnected.set(true);
+        LOGGER.info("Client disconnects");

Review Comment:
   In that case, recommend the following wording adjustment:
   ```suggestion
           LOGGER.info("Client disconnected");
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered. Manager tries to move corrupted database files to the backup 
location: {}", context.getBackupLocation(), e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsPath().toFile(), "backup_" + 
System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsPath().toFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
directory creation failed [%s]", context.getPersistLocationAsPath()), e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
connection failed [%s]", absolutePath), e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.debug("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.debug("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new 
CorruptedDatabaseException(String.format("Creating table [%s] has failed", 
tableDefinition.getName()), e);
+                    }
+                } else if 
(!databaseFiles.get(tableDefinition.getName()).isDirectory()) {
+                    throw new CorruptedDatabaseException(String.format("Table 
%s cannot be created because there is already a file exists with the given 
name", tableDefinition.getName()));
+                }
+            }
+
+            // Checking if tables are healthy.
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                try {
+                    final TableToken tableToken = 
this.engine.get().getTableTokenIfExists(tableDefinition.getName());
+                    final TableRecordMetadata metadata = 
this.engine.get().getSequencerMetadata(tableToken);
+                    metadata.close();
+
+                    client.execute(String.format("SELECT * FROM %S LIMIT 1", 
tableDefinition.getName()));
+                } catch (final Exception e) {
+                    throw new CorruptedDatabaseException(e);
+                }
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (DatabaseException e) {
+                throw new CorruptedDatabaseException(e);
+            }
+        }
+    }
+
+    private void startRollover() {
+        final RolloverWorker rolloverWorker = new 
RolloverWorker(acquireClient(), context.getTableDefinitions());
+        final ScheduledFuture<?> rolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(
+                rolloverWorker, context.getRolloverFrequency().toMillis(), 
context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS);
+        scheduledFutures.add(rolloverFuture);
+        LOGGER.debug("Rollover started");
+    }
+
+    private void stopRollover() {
+        LOGGER.debug("Rollover shutdown started");
+
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+
+        LOGGER.debug("Rollover shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks 
[{}]", tasks.size());
+    }
+
+    private Client getUnmanagedClient() {
+        return new EmbeddedClient(() -> engine.get());
+    }
+
+    public Client acquireClient() {
+        checkIfManagerIsInitialised();
+        final Client fallback = new NoOpClient();
+
+        if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) {
+            LOGGER.warn("The database is corrupted. Dummy client is returned");

Review Comment:
   I recommend changing this to and error and adjusting the wording to indicate 
the behavior.
   ```suggestion
               LOGGER.error("The database is corrupted: Status History will not 
be stored");
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered. Manager tries to move corrupted database files to the backup 
location: {}", context.getBackupLocation(), e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsPath().toFile(), "backup_" + 
System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsPath().toFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
directory creation failed [%s]", context.getPersistLocationAsPath()), e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            throw new CorruptedDatabaseException(String.format("Database 
connection failed [%s]", absolutePath), e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.debug("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.debug("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new 
CorruptedDatabaseException(String.format("Creating table [%s] has failed", 
tableDefinition.getName()), e);
+                    }
+                } else if 
(!databaseFiles.get(tableDefinition.getName()).isDirectory()) {
+                    throw new CorruptedDatabaseException(String.format("Table 
%s cannot be created because there is already a file exists with the given 
name", tableDefinition.getName()));
+                }
+            }
+
+            // Checking if tables are healthy.
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                try {
+                    final TableToken tableToken = 
this.engine.get().getTableTokenIfExists(tableDefinition.getName());
+                    final TableRecordMetadata metadata = 
this.engine.get().getSequencerMetadata(tableToken);
+                    metadata.close();
+
+                    client.execute(String.format("SELECT * FROM %S LIMIT 1", 
tableDefinition.getName()));
+                } catch (final Exception e) {
+                    throw new CorruptedDatabaseException(e);
+                }
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (DatabaseException e) {
+                throw new CorruptedDatabaseException(e);
+            }
+        }
+    }
+
+    private void startRollover() {
+        final RolloverWorker rolloverWorker = new 
RolloverWorker(acquireClient(), context.getTableDefinitions());
+        final ScheduledFuture<?> rolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(
+                rolloverWorker, context.getRolloverFrequency().toMillis(), 
context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS);
+        scheduledFutures.add(rolloverFuture);
+        LOGGER.debug("Rollover started");
+    }
+
+    private void stopRollover() {
+        LOGGER.debug("Rollover shutdown started");
+
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+
+        LOGGER.debug("Rollover shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks 
[{}]", tasks.size());
+    }
+
+    private Client getUnmanagedClient() {
+        return new EmbeddedClient(() -> engine.get());
+    }
+
+    public Client acquireClient() {
+        checkIfManagerIsInitialised();
+        final Client fallback = new NoOpClient();
+
+        if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) {
+            LOGGER.warn("The database is corrupted. Dummy client is returned");
+            return fallback;
+        }
+
+        final LockedClient lockedClient = new LockedClient(
+                databaseStructureLock.readLock(),
+                context.getLockAttemptTime(),
+                new ConditionAwareClient(() -> state.get() == 
EmbeddedDatabaseManagerStatus.HEALTHY, getUnmanagedClient())
+        );
+
+        return 
SpringRetryingClient.getInstance(context.getNumberOfAttemptedRetries(), 
this::errorAction, lockedClient, fallback);
+    }
+
+    private void checkIfManagerIsInitialised() {
+        if (state.get() == EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("The state of the database manager 
is not initialized");
+        }
+    }
+
+    private void errorAction(final int attemptNumber, final Throwable 
throwable) {
+        if (shouldRestoreDatabase(attemptNumber, throwable)) {
+            LOGGER.error("Database manager tries to restore database after the 
first failed attempt if necessary");
+            ensureDatabaseIsReady();
+        } else {
+            LOGGER.warn("Error happened at attempt {}: {}", attemptNumber, 
throwable);

Review Comment:
   ```suggestion
               LOGGER.warn("Error happened at attempt {}", attemptNumber, 
throwable);
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/test/java/org/apache/nifi/questdb/util/Event.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.util;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/**
+ * This class exists for test purposes
+ */
+public class Event {
+    private Instant capturedAt;

Review Comment:
   ```suggestion
       private Instant captured;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to