exceptionfactory commented on code in PR #8152: URL: https://github.com/apache/nifi/pull/8152#discussion_r1458179457
########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/pom.xml: ########## @@ -0,0 +1,67 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-questdb-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-questdb</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.questdb</groupId> + <artifactId>questdb</artifactId> + <version>7.3.7</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + </dependency> + + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + <version>6.1.2</version> Review Comment: This version can be removed because it is managed at the root pom.xml. Is this dependency required? ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/rollover/DeleteOldRolloverStrategy.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.rollover; + +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.apache.nifi.questdb.QueryRowContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Supplier; + +final class DeleteOldRolloverStrategy implements RolloverStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(DeleteOldRolloverStrategy.class); + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); Review Comment: Is there a reason for using `UTC` in this case? It seems like it would be more intuitive to use `LocalDateTime`, matching the system time. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedClient.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.embedded; + +import io.questdb.cairo.CairoEngine; +import io.questdb.cairo.CairoError; +import io.questdb.cairo.TableToken; +import io.questdb.cairo.TableWriter; +import io.questdb.cairo.sql.RecordCursor; +import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.griffin.CompiledQuery; +import io.questdb.griffin.SqlCompiler; +import io.questdb.griffin.SqlCompilerFactoryImpl; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.mp.SCSequence; +import io.questdb.mp.TimeoutBlockingWaitStrategy; +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.DatabaseException; +import org.apache.nifi.questdb.InsertRowDataSource; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +final class EmbeddedClient implements Client { + private final static Logger LOGGER = LoggerFactory.getLogger(EmbeddedClient.class); + + private final Supplier<CairoEngine> engine; + private final AtomicBoolean disconnected = new AtomicBoolean(false); + + EmbeddedClient(final Supplier<CairoEngine> engine) { + this.engine = engine; + } + + @Override + public void execute(final String query) throws DatabaseException { + checkConnectionState(); + + try (final SqlCompiler compiler = getCompiler()) { + final CompiledQuery compile = compiler.compile(query, getSqlExecutionContext()); + compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, TimeUnit.SECONDS))); + } catch (final SqlException | CairoError e) { + throw new DatabaseException(e); + } + } + + @Override + public void insert( + final String tableName, + final InsertRowDataSource rowDataSource + ) throws DatabaseException { + checkConnectionState(); + + if (!rowDataSource.hasNextToInsert()) { + LOGGER.debug("No rows to insert into {}", tableName); + return; + } + + final TableToken tableToken = engine.get().getTableTokenIfExists(tableName); + + if (tableToken == null) { + throw new DatabaseException(String.format("Table Token for table [%s] not found", tableName)); + } + + try ( + final TableWriter tableWriter = engine.get().getWriter(tableToken, "adding rows") + ) { + final TableWriterBasedInsertRowContext context = new TableWriterBasedInsertRowContext(tableWriter); + + while (rowDataSource.hasNextToInsert()) { + context.addRow(rowDataSource); + } + + LOGGER.debug("Committing {} rows", tableWriter.getRowCount()); + tableWriter.commit(); + } catch (final Exception | CairoError e) { + // CairoError might be thrown in extreme cases, for example when no space left on the disk + throw new DatabaseException(e); + } finally { + engine.get().releaseInactive(); + } + } + + @Override + public <T> T query(final String query, final QueryResultProcessor<T> rowProcessor) throws DatabaseException { + checkConnectionState(); + + final CompiledQuery compiledQuery; + + try (final SqlCompiler compiler = getCompiler()) { + compiledQuery = compiler.compile(query, getSqlExecutionContext()); + } catch (final SqlException | CairoError e) { + throw new DatabaseException(e); + } + + try ( + final RecordCursorFactory factory = compiledQuery.getRecordCursorFactory(); + final RecordCursor cursor = factory.getCursor(getSqlExecutionContext()); + ) { + final CursorBasedQueryRowContext rowContext = new CursorBasedQueryRowContext(cursor); + + while ((rowContext.hasNext())) { + rowContext.moveToNext(); + rowProcessor.processRow(rowContext); + } + + return rowProcessor.getResult(); + } catch (final Exception e) { + throw new DatabaseException(e); + } + } + + @Override + public void disconnect() throws DatabaseException { + checkConnectionState(); + disconnected.set(true); + LOGGER.info("Client disconnects"); Review Comment: In that case, recommend the following wording adjustment: ```suggestion LOGGER.info("Client disconnected"); ``` ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.embedded; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.CairoEngine; +import io.questdb.cairo.DefaultCairoConfiguration; +import io.questdb.cairo.TableToken; +import io.questdb.cairo.sql.TableRecordMetadata; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.DatabaseException; +import org.apache.nifi.questdb.DatabaseManager; +import org.apache.nifi.util.file.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +final class EmbeddedDatabaseManager implements DatabaseManager { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedDatabaseManager.class); + + private final String id = UUID.randomUUID().toString(); + private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED); + private final ReadWriteLock databaseStructureLock = new ReentrantReadWriteLock(); + private final EmbeddedDatabaseManagerContext context; + private final AtomicReference<CairoEngine> engine = new AtomicReference<>(); + private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newScheduledThreadPool(2, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id + "-%d").build()); + + EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) { + this.context = context; + } + + @Override + public void init() { + if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) { + throw new IllegalStateException("Manager is already initialized"); + } + + ensureDatabaseIsReady(); + startRollover(); + } + + private void ensureDatabaseIsReady() { + boolean successful = false; + + try { + databaseStructureLock.writeLock().lock(); + state.set(EmbeddedDatabaseManagerStatus.REPAIRING); + + try { + ensurePersistLocationIsAccessible(); + ensureConnectionEstablished(); + ensureTablesAreInPlaceAndHealthy(); + successful = true; + } catch (final CorruptedDatabaseException e) { + boolean couldMoveOldToBackup = false; + + try { + LOGGER.error("Database is corrupted. Recreation is triggered. Manager tries to move corrupted database files to the backup location: {}", context.getBackupLocation(), e); + final File backupFolder = new File(context.getBackupLocationAsPath().toFile(), "backup_" + System.currentTimeMillis()); + FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsPath().toFile()); + Files.move(context.getPersistLocationAsPath(), backupFolder.toPath()); + couldMoveOldToBackup = true; + } catch (IOException ex) { + LOGGER.error("Could not create backup", ex); + } + + if (!couldMoveOldToBackup) { + try { + FileUtils.deleteFile(context.getPersistLocationAsFile(), true); + couldMoveOldToBackup = true; + } catch (IOException ex) { + LOGGER.error("Could not clean up corrupted database", ex); + } + } + + if (couldMoveOldToBackup) { + try { + ensurePersistLocationIsAccessible(); + ensureConnectionEstablished(); + ensureTablesAreInPlaceAndHealthy(); + successful = true; + } catch (CorruptedDatabaseException ex) { + LOGGER.error("Could not create backup", ex); + } + } + } + } finally { + state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : EmbeddedDatabaseManagerStatus.CORRUPTED); + + if (!successful) { + engine.set(null); + } + + databaseStructureLock.writeLock().unlock(); + } + + } + + private void ensurePersistLocationIsAccessible() throws CorruptedDatabaseException { + try { + FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile()); + } catch (final Exception e) { + throw new CorruptedDatabaseException(String.format("Database directory creation failed [%s]", context.getPersistLocationAsPath()), e); + } + } + + private void ensureConnectionEstablished() throws CorruptedDatabaseException { + if (engine.get() != null) { + engine.getAndSet(null).close(); + } + + final String absolutePath = context.getPersistLocationAsFile().getAbsolutePath(); + final CairoConfiguration configuration = new DefaultCairoConfiguration(absolutePath); + + try { + final CairoEngine engine = new CairoEngine(configuration); + LOGGER.info("Database connection successful [{}]", absolutePath); + this.engine.set(engine); + } catch (final Exception e) { + throw new CorruptedDatabaseException(String.format("Database connection failed [%s]", absolutePath), e); + } + } + + private void ensureTablesAreInPlaceAndHealthy() throws CorruptedDatabaseException { + final Map<String, File> databaseFiles = Arrays.stream(context.getPersistLocationAsFile().listFiles()) + .collect(Collectors.toMap(f -> f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length() + 1), f -> f)); + final Client client = getUnmanagedClient(); + + try { + for (final ManagedTableDefinition tableDefinition : context.getTableDefinitions()) { + if (!databaseFiles.containsKey(tableDefinition.getName())) { + try { + LOGGER.debug("Creating table {}", tableDefinition.getName()); + client.execute(tableDefinition.getDefinition()); + LOGGER.debug("Table {} is created", tableDefinition.getName()); + } catch (DatabaseException e) { + throw new CorruptedDatabaseException(String.format("Creating table [%s] has failed", tableDefinition.getName()), e); + } + } else if (!databaseFiles.get(tableDefinition.getName()).isDirectory()) { + throw new CorruptedDatabaseException(String.format("Table %s cannot be created because there is already a file exists with the given name", tableDefinition.getName())); + } + } + + // Checking if tables are healthy. + for (final ManagedTableDefinition tableDefinition : context.getTableDefinitions()) { + try { + final TableToken tableToken = this.engine.get().getTableTokenIfExists(tableDefinition.getName()); + final TableRecordMetadata metadata = this.engine.get().getSequencerMetadata(tableToken); + metadata.close(); + + client.execute(String.format("SELECT * FROM %S LIMIT 1", tableDefinition.getName())); + } catch (final Exception e) { + throw new CorruptedDatabaseException(e); + } + } + } finally { + try { + client.disconnect(); + } catch (DatabaseException e) { + throw new CorruptedDatabaseException(e); + } + } + } + + private void startRollover() { + final RolloverWorker rolloverWorker = new RolloverWorker(acquireClient(), context.getTableDefinitions()); + final ScheduledFuture<?> rolloverFuture = scheduledExecutorService.scheduleWithFixedDelay( + rolloverWorker, context.getRolloverFrequency().toMillis(), context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS); + scheduledFutures.add(rolloverFuture); + LOGGER.debug("Rollover started"); + } + + private void stopRollover() { + LOGGER.debug("Rollover shutdown started"); + + int cancelCompleted = 0; + int cancelFailed = 0; + for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) { + final boolean cancelled = scheduledFuture.cancel(true); + if (cancelled) { + cancelCompleted++; + } else { + cancelFailed++; + } + } + + LOGGER.debug("Rollover shutdown task cancellation status: completed [{}] failed [{}]", cancelCompleted, cancelFailed); + final List<Runnable> tasks = scheduledExecutorService.shutdownNow(); + LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks [{}]", tasks.size()); + } + + private Client getUnmanagedClient() { + return new EmbeddedClient(() -> engine.get()); + } + + public Client acquireClient() { + checkIfManagerIsInitialised(); + final Client fallback = new NoOpClient(); + + if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) { + LOGGER.warn("The database is corrupted. Dummy client is returned"); Review Comment: I recommend changing this to and error and adjusting the wording to indicate the behavior. ```suggestion LOGGER.error("The database is corrupted: Status History will not be stored"); ``` ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.embedded; + +import io.questdb.cairo.CairoConfiguration; +import io.questdb.cairo.CairoEngine; +import io.questdb.cairo.DefaultCairoConfiguration; +import io.questdb.cairo.TableToken; +import io.questdb.cairo.sql.TableRecordMetadata; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.DatabaseException; +import org.apache.nifi.questdb.DatabaseManager; +import org.apache.nifi.util.file.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +final class EmbeddedDatabaseManager implements DatabaseManager { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedDatabaseManager.class); + + private final String id = UUID.randomUUID().toString(); + private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED); + private final ReadWriteLock databaseStructureLock = new ReentrantReadWriteLock(); + private final EmbeddedDatabaseManagerContext context; + private final AtomicReference<CairoEngine> engine = new AtomicReference<>(); + private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newScheduledThreadPool(2, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id + "-%d").build()); + + EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) { + this.context = context; + } + + @Override + public void init() { + if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) { + throw new IllegalStateException("Manager is already initialized"); + } + + ensureDatabaseIsReady(); + startRollover(); + } + + private void ensureDatabaseIsReady() { + boolean successful = false; + + try { + databaseStructureLock.writeLock().lock(); + state.set(EmbeddedDatabaseManagerStatus.REPAIRING); + + try { + ensurePersistLocationIsAccessible(); + ensureConnectionEstablished(); + ensureTablesAreInPlaceAndHealthy(); + successful = true; + } catch (final CorruptedDatabaseException e) { + boolean couldMoveOldToBackup = false; + + try { + LOGGER.error("Database is corrupted. Recreation is triggered. Manager tries to move corrupted database files to the backup location: {}", context.getBackupLocation(), e); + final File backupFolder = new File(context.getBackupLocationAsPath().toFile(), "backup_" + System.currentTimeMillis()); + FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsPath().toFile()); + Files.move(context.getPersistLocationAsPath(), backupFolder.toPath()); + couldMoveOldToBackup = true; + } catch (IOException ex) { + LOGGER.error("Could not create backup", ex); + } + + if (!couldMoveOldToBackup) { + try { + FileUtils.deleteFile(context.getPersistLocationAsFile(), true); + couldMoveOldToBackup = true; + } catch (IOException ex) { + LOGGER.error("Could not clean up corrupted database", ex); + } + } + + if (couldMoveOldToBackup) { + try { + ensurePersistLocationIsAccessible(); + ensureConnectionEstablished(); + ensureTablesAreInPlaceAndHealthy(); + successful = true; + } catch (CorruptedDatabaseException ex) { + LOGGER.error("Could not create backup", ex); + } + } + } + } finally { + state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : EmbeddedDatabaseManagerStatus.CORRUPTED); + + if (!successful) { + engine.set(null); + } + + databaseStructureLock.writeLock().unlock(); + } + + } + + private void ensurePersistLocationIsAccessible() throws CorruptedDatabaseException { + try { + FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile()); + } catch (final Exception e) { + throw new CorruptedDatabaseException(String.format("Database directory creation failed [%s]", context.getPersistLocationAsPath()), e); + } + } + + private void ensureConnectionEstablished() throws CorruptedDatabaseException { + if (engine.get() != null) { + engine.getAndSet(null).close(); + } + + final String absolutePath = context.getPersistLocationAsFile().getAbsolutePath(); + final CairoConfiguration configuration = new DefaultCairoConfiguration(absolutePath); + + try { + final CairoEngine engine = new CairoEngine(configuration); + LOGGER.info("Database connection successful [{}]", absolutePath); + this.engine.set(engine); + } catch (final Exception e) { + throw new CorruptedDatabaseException(String.format("Database connection failed [%s]", absolutePath), e); + } + } + + private void ensureTablesAreInPlaceAndHealthy() throws CorruptedDatabaseException { + final Map<String, File> databaseFiles = Arrays.stream(context.getPersistLocationAsFile().listFiles()) + .collect(Collectors.toMap(f -> f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length() + 1), f -> f)); + final Client client = getUnmanagedClient(); + + try { + for (final ManagedTableDefinition tableDefinition : context.getTableDefinitions()) { + if (!databaseFiles.containsKey(tableDefinition.getName())) { + try { + LOGGER.debug("Creating table {}", tableDefinition.getName()); + client.execute(tableDefinition.getDefinition()); + LOGGER.debug("Table {} is created", tableDefinition.getName()); + } catch (DatabaseException e) { + throw new CorruptedDatabaseException(String.format("Creating table [%s] has failed", tableDefinition.getName()), e); + } + } else if (!databaseFiles.get(tableDefinition.getName()).isDirectory()) { + throw new CorruptedDatabaseException(String.format("Table %s cannot be created because there is already a file exists with the given name", tableDefinition.getName())); + } + } + + // Checking if tables are healthy. + for (final ManagedTableDefinition tableDefinition : context.getTableDefinitions()) { + try { + final TableToken tableToken = this.engine.get().getTableTokenIfExists(tableDefinition.getName()); + final TableRecordMetadata metadata = this.engine.get().getSequencerMetadata(tableToken); + metadata.close(); + + client.execute(String.format("SELECT * FROM %S LIMIT 1", tableDefinition.getName())); + } catch (final Exception e) { + throw new CorruptedDatabaseException(e); + } + } + } finally { + try { + client.disconnect(); + } catch (DatabaseException e) { + throw new CorruptedDatabaseException(e); + } + } + } + + private void startRollover() { + final RolloverWorker rolloverWorker = new RolloverWorker(acquireClient(), context.getTableDefinitions()); + final ScheduledFuture<?> rolloverFuture = scheduledExecutorService.scheduleWithFixedDelay( + rolloverWorker, context.getRolloverFrequency().toMillis(), context.getRolloverFrequency().toMillis(), TimeUnit.MILLISECONDS); + scheduledFutures.add(rolloverFuture); + LOGGER.debug("Rollover started"); + } + + private void stopRollover() { + LOGGER.debug("Rollover shutdown started"); + + int cancelCompleted = 0; + int cancelFailed = 0; + for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) { + final boolean cancelled = scheduledFuture.cancel(true); + if (cancelled) { + cancelCompleted++; + } else { + cancelFailed++; + } + } + + LOGGER.debug("Rollover shutdown task cancellation status: completed [{}] failed [{}]", cancelCompleted, cancelFailed); + final List<Runnable> tasks = scheduledExecutorService.shutdownNow(); + LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks [{}]", tasks.size()); + } + + private Client getUnmanagedClient() { + return new EmbeddedClient(() -> engine.get()); + } + + public Client acquireClient() { + checkIfManagerIsInitialised(); + final Client fallback = new NoOpClient(); + + if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) { + LOGGER.warn("The database is corrupted. Dummy client is returned"); + return fallback; + } + + final LockedClient lockedClient = new LockedClient( + databaseStructureLock.readLock(), + context.getLockAttemptTime(), + new ConditionAwareClient(() -> state.get() == EmbeddedDatabaseManagerStatus.HEALTHY, getUnmanagedClient()) + ); + + return SpringRetryingClient.getInstance(context.getNumberOfAttemptedRetries(), this::errorAction, lockedClient, fallback); + } + + private void checkIfManagerIsInitialised() { + if (state.get() == EmbeddedDatabaseManagerStatus.UNINITIALIZED) { + throw new IllegalStateException("The state of the database manager is not initialized"); + } + } + + private void errorAction(final int attemptNumber, final Throwable throwable) { + if (shouldRestoreDatabase(attemptNumber, throwable)) { + LOGGER.error("Database manager tries to restore database after the first failed attempt if necessary"); + ensureDatabaseIsReady(); + } else { + LOGGER.warn("Error happened at attempt {}: {}", attemptNumber, throwable); Review Comment: ```suggestion LOGGER.warn("Error happened at attempt {}", attemptNumber, throwable); ``` ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/test/java/org/apache/nifi/questdb/util/Event.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.util; + +import java.time.Instant; +import java.util.Objects; + +/** + * This class exists for test purposes + */ +public class Event { + private Instant capturedAt; Review Comment: ```suggestion private Instant captured; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org