This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4250-add-csv-dataset-import-with-preview-for-data-sets in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 30660e94dc1e177694975b4c0c9bfa7d2af6277d Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Mar 13 16:15:16 2026 +0100 feat(#4250): Improve upload dialog --- .../datalake/importer/CsvImportPreviewRequest.java | 9 + .../datalake/importer/CsvImportPreviewResult.java | 9 + .../model/datalake/importer/CsvImportRequest.java | 9 + .../impl/datalake/CsvDataLakeImportService.java | 578 ++++++++++++++++++--- .../rest/impl/datalake/CsvImportUploadStorage.java | 130 +++++ .../rest/impl/datalake/DataLakeDataWriter.java | 47 +- .../rest/impl/datalake/DataLakeImportResource.java | 44 +- .../datalake/CsvDataLakeImportServiceTest.java | 109 +++- .../rest/impl/datalake/DataLakeDataWriterTest.java | 56 ++ ...achine-data-simulator-import-missing-values.csv | 8 + 10 files changed, 914 insertions(+), 85 deletions(-) diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java index 84e52dbeb1..7af3a0f44b 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java @@ -22,12 +22,21 @@ import java.util.List; public class CsvImportPreviewRequest { + private String uploadId; private String fileName; private CsvImportConfiguration csvConfig; private List<String> headers; private List<List<String>> rows; private CsvImportTarget target; + public String getUploadId() { + return uploadId; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } + public String getFileName() { return fileName; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java index 6f6ebcf227..4bb9b84cf8 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java @@ -25,6 +25,7 @@ import java.util.List; public class CsvImportPreviewResult { + private String uploadId; private List<String> headers = new ArrayList<>(); private List<List<String>> previewRows = new ArrayList<>(); private List<CsvImportColumn> columns = new ArrayList<>(); @@ -33,6 +34,14 @@ public class CsvImportPreviewResult { private boolean valid; private List<CsvImportValidationMessage> validationMessages = new ArrayList<>(); + public String getUploadId() { + return uploadId; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } + public List<String> getHeaders() { return headers; } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java index e571bae89e..c882562ae7 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java @@ -23,6 +23,7 @@ import java.util.List; public class CsvImportRequest { + private String uploadId; private CsvImportConfiguration csvConfig; private List<String> headers = new ArrayList<>(); private List<List<String>> rows = new ArrayList<>(); @@ -30,6 +31,14 @@ public class CsvImportRequest { private String timestampColumn; private List<CsvImportColumn> columns = new ArrayList<>(); + public String getUploadId() { + return uploadId; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } + public CsvImportConfiguration getCsvConfig() { return csvConfig; } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java index a36980446e..8848170b08 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java @@ -22,7 +22,6 @@ import org.apache.streampipes.connect.management.util.EventSchemaUtils; import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.datalake.DataSeriesBuilder; -import org.apache.streampipes.model.datalake.SpQueryResult; import org.apache.streampipes.model.datalake.SpQueryResultBuilder; import org.apache.streampipes.model.datalake.importer.CsvImportColumn; import org.apache.streampipes.model.datalake.importer.CsvImportConfiguration; @@ -44,6 +43,14 @@ import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.vocabulary.SO; import org.apache.streampipes.vocabulary.XSD; +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; +import java.io.PushbackReader; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -61,47 +68,85 @@ import java.util.stream.Collectors; public class CsvDataLakeImportService { private static final int MAX_PREVIEW_ROWS = 50; + private static final int MAX_ANALYSIS_ROWS = 200; + private static final int IMPORT_BATCH_SIZE = 5000; private static final String STREAM_PREFIX = "s0::"; private final IDataExplorerSchemaManagement schemaManagement; private final DataLakeDataWriter dataWriter; + private final CsvImportUploadStorage uploadStorage; public CsvDataLakeImportService(IDataExplorerSchemaManagement schemaManagement) { this( schemaManagement, - new DataLakeDataWriter(false) + new DataLakeDataWriter(false, true), + new CsvImportUploadStorage() ); } CsvDataLakeImportService( IDataExplorerSchemaManagement schemaManagement, DataLakeDataWriter dataWriter + ) { + this(schemaManagement, dataWriter, new CsvImportUploadStorage()); + } + + CsvDataLakeImportService( + IDataExplorerSchemaManagement schemaManagement, + DataLakeDataWriter dataWriter, + CsvImportUploadStorage uploadStorage ) { this.schemaManagement = schemaManagement; this.dataWriter = dataWriter; + this.uploadStorage = uploadStorage; } public CsvImportPreviewResult preview(CsvImportPreviewRequest request) { var validationMessages = validatePreviewRequest(request); var headers = sanitizeHeaders(request.getHeaders()); var rows = Optional.ofNullable(request.getRows()).orElseGet(Collections::emptyList); + return buildPreviewResult(request, headers, rows, validationMessages, null); + } - var columns = inferColumns(headers, rows, request.getCsvConfig()); - var eventSchema = buildEventSchema(columns, rows, request.getCsvConfig(), null); - validationMessages.addAll(validatePreviewTarget(request.getTarget())); + public CsvImportPreviewResult preview(CsvImportPreviewRequest request, String principalSid) { + if (!hasUploadId(request)) { + return preview(request); + } - var result = new CsvImportPreviewResult(); - result.setHeaders(headers); - result.setPreviewRows(rows.stream().limit(MAX_PREVIEW_ROWS).collect(Collectors.toList())); - result.setColumns(columns); - result.setGuessedEventSchema(eventSchema); - result.setTimestampCandidates(columns.stream() - .filter(CsvImportColumn::isTimestampCandidate) - .map(CsvImportColumn::getRuntimeName) - .collect(Collectors.toList())); - result.setValidationMessages(validationMessages); - result.setValid(validationMessages.isEmpty()); - return result; + var validationMessages = validatePreviewConfiguration(request); + if (!validationMessages.isEmpty()) { + return buildInvalidPreviewResult(validationMessages, request.getUploadId()); + } + + try { + var upload = resolveUpload(request.getUploadId(), principalSid); + var csvSample = readCsvSample(upload.path(), request.getCsvConfig(), MAX_ANALYSIS_ROWS); + return buildPreviewResult(request, csvSample.headers(), csvSample.rows(), validationMessages, upload.uploadId()); + } catch (CsvImportValidationException e) { + return buildInvalidPreviewResult(e.getValidationMessages(), request.getUploadId()); + } catch (IOException | UncheckedIOException e) { + return buildInvalidPreviewResult( + List.of(message("file", "The CSV file could not be parsed with the current settings.")), + request.getUploadId() + ); + } + } + + public CsvImportPreviewResult preview(MultipartFile file, CsvImportPreviewRequest request, String principalSid) + throws IOException { + var validationMessages = validatePreviewConfiguration(request); + if (!validationMessages.isEmpty()) { + return buildInvalidPreviewResult(validationMessages, null); + } + + var upload = uploadStorage.store(file, principalSid); + try { + var csvSample = readCsvSample(upload.path(), request.getCsvConfig(), MAX_ANALYSIS_ROWS); + return buildPreviewResult(request, csvSample.headers(), csvSample.rows(), validationMessages, upload.uploadId()); + } catch (IOException | UncheckedIOException e) { + uploadStorage.remove(upload.uploadId()); + throw e; + } } public CsvImportSchemaValidationResult validateSchema(CsvImportSchemaValidationRequest request) { @@ -127,7 +172,11 @@ public class CsvDataLakeImportService { } public CsvImportResult importData(CsvImportRequest request, String principalSid) { - var validationMessages = validateImportRequest(request); + if (hasUploadId(request)) { + return importUploadedData(request, principalSid); + } + + var validationMessages = validateInlineImportRequest(request); if (!validationMessages.isEmpty()) { throw new CsvImportValidationException(validationMessages); } @@ -158,8 +207,14 @@ public class CsvDataLakeImportService { measure = requireExistingMeasurement(request.getTarget().getMeasurementName()); } - var queryResult = toQueryResult(request); - dataWriter.writeData(measure, queryResult); + var queryResult = DataSeriesBuilder.create() + .withHeaders(request.getColumns().stream().map(CsvImportColumn::getRuntimeName).collect(Collectors.toList())) + .withRows(toImportRows(request)) + .build(); + dataWriter.writeData( + measure, + SpQueryResultBuilder.create(queryResult.getHeaders()).withDataSeries(queryResult).build() + ); var result = new CsvImportResult(); result.setMeasurementId(measure.getElementId()); @@ -170,12 +225,211 @@ public class CsvDataLakeImportService { return result; } - private List<CsvImportValidationMessage> validatePreviewRequest(CsvImportPreviewRequest request) { + private CsvImportResult importUploadedData(CsvImportRequest request, String principalSid) { + var validationMessages = validateStoredImportRequest(request); + if (!validationMessages.isEmpty()) { + throw new CsvImportValidationException(validationMessages); + } + + var upload = resolveUpload(request.getUploadId(), principalSid); + var sanitizedColumns = sanitizeImportColumns(request.getColumns()); + var eventSchema = buildConfiguredEventSchema(sanitizedColumns, request.getTimestampColumn()); + + validationMessages.addAll(validateImportTarget(request.getTarget(), eventSchema, request.getTimestampColumn())); + if (!validationMessages.isEmpty()) { + throw new CsvImportValidationException(validationMessages); + } + + var createdNewMeasurement = false; + DataLakeMeasure measure; + + if (request.getTarget().getMode() == CsvImportTargetMode.NEW) { + measure = new DataLakeMeasure(); + measure.setMeasureName(request.getTarget().getMeasurementName().trim()); + measure.setTimestampField(STREAM_PREFIX + request.getTimestampColumn()); + measure.setEventSchema(eventSchema); + measure = schemaManagement.createOrUpdateMeasurement(measure, principalSid); + createdNewMeasurement = true; + } else { + measure = requireExistingMeasurement(request.getTarget().getMeasurementName()); + } + + try { + var importedRowCount = importCsvFile(upload.path(), request, measure); + uploadStorage.remove(upload.uploadId()); + + var result = new CsvImportResult(); + result.setMeasurementId(measure.getElementId()); + result.setMeasurementName(measure.getMeasureName()); + result.setCreatedNewMeasurement(createdNewMeasurement); + result.setImportedRowCount(importedRowCount); + result.setValidationMessages(List.of()); + return result; + } catch (IOException | UncheckedIOException e) { + throw new CsvImportValidationException(List.of( + message("file", "The CSV file could not be parsed with the current settings.") + )); + } + } + + private CsvImportPreviewResult buildPreviewResult( + CsvImportPreviewRequest request, + List<String> headers, + List<List<String>> rows, + List<CsvImportValidationMessage> validationMessages, + String uploadId + ) { + var messages = new ArrayList<>(validationMessages); + var columns = inferColumns(headers, rows, request.getCsvConfig()); + var eventSchema = buildEventSchema(columns, rows, request.getCsvConfig(), null); + messages.addAll(validatePreviewTarget(request.getTarget())); + + var result = new CsvImportPreviewResult(); + result.setUploadId(uploadId); + result.setHeaders(headers); + result.setPreviewRows(rows.stream().limit(MAX_PREVIEW_ROWS).collect(Collectors.toList())); + result.setColumns(columns); + result.setGuessedEventSchema(eventSchema); + result.setTimestampCandidates(columns.stream() + .filter(CsvImportColumn::isTimestampCandidate) + .map(CsvImportColumn::getRuntimeName) + .collect(Collectors.toList())); + result.setValidationMessages(messages); + result.setValid(messages.isEmpty()); + return result; + } + + private CsvImportPreviewResult buildInvalidPreviewResult( + List<CsvImportValidationMessage> validationMessages, + String uploadId + ) { + var result = new CsvImportPreviewResult(); + result.setUploadId(uploadId); + result.setValidationMessages(validationMessages); + result.setValid(false); + return result; + } + + private List<List<Object>> toImportRows(CsvImportRequest request) { + var rows = new ArrayList<List<Object>>(); + for (int rowIndex = 0; rowIndex < request.getRows().size(); rowIndex++) { + rows.add(convertRow(request.getRows().get(rowIndex), request, rowIndex + 1)); + } + return rows; + } + + private int importCsvFile(Path path, CsvImportRequest request, DataLakeMeasure measure) throws IOException { + var runtimeHeaders = request.getColumns().stream() + .map(CsvImportColumn::getRuntimeName) + .collect(Collectors.toList()); + var batch = new ArrayList<List<Object>>(); + var importedRows = new int[]{0}; + + parseCsvFile(path, request.getCsvConfig(), new CsvRowConsumer() { + private List<String> parsedHeaders; + + @Override + public void onHeaders(List<String> headers) { + parsedHeaders = headers; + validateUploadedHeaders(headers, request.getColumns()); + } + + @Override + public void onRow(int rowNumber, List<String> row) { + if (row.size() != parsedHeaders.size()) { + throw new CsvImportValidationException(List.of( + message("rows", "Row " + rowNumber + " does not match the header size.") + )); + } + batch.add(convertRow(row, request, rowNumber)); + if (batch.size() >= IMPORT_BATCH_SIZE) { + flushImportBatch(measure, runtimeHeaders, batch); + importedRows[0] += IMPORT_BATCH_SIZE; + batch.clear(); + } + } + }); + + if (!batch.isEmpty()) { + var batchSize = batch.size(); + flushImportBatch(measure, runtimeHeaders, batch); + importedRows[0] += batchSize; + } + + return importedRows[0]; + } + + private void flushImportBatch(DataLakeMeasure measure, List<String> runtimeHeaders, List<List<Object>> batch) { + dataWriter.writeData(measure, runtimeHeaders, new ArrayList<>(batch)); + } + + private List<Object> convertRow(List<String> row, CsvImportRequest request, int rowNumber) { + var converted = new ArrayList<Object>(); + for (int i = 0; i < row.size(); i++) { + converted.add(convertValue( + row.get(i), + request.getColumns().get(i), + request.getCsvConfig(), + request.getTimestampColumn(), + rowNumber + )); + } + return converted; + } + + private void validateUploadedHeaders(List<String> headers, List<CsvImportColumn> columns) { + if (headers.size() != columns.size()) { + throw new CsvImportValidationException(List.of( + message("headers", "The uploaded CSV file no longer matches the previewed column count.") + )); + } + + for (int i = 0; i < headers.size(); i++) { + if (!Objects.equals(headers.get(i), columns.get(i).getCsvColumn())) { + throw new CsvImportValidationException(List.of( + message("headers", "The uploaded CSV file no longer matches the previewed headers.") + )); + } + } + } + + private CsvImportUploadStorage.StoredUpload resolveUpload(String uploadId, String principalSid) { + var upload = uploadStorage.get(uploadId).orElseThrow(() -> new CsvImportValidationException(List.of( + message("uploadId", "The uploaded CSV file was not found. Please upload the file again.") + ))); + + if (!Objects.equals(upload.ownerSid(), principalSid)) { + throw new CsvImportValidationException(List.of( + message("uploadId", "The uploaded CSV file is no longer available for this user.") + )); + } + + return upload; + } + + private boolean hasUploadId(CsvImportPreviewRequest request) { + return request != null && request.getUploadId() != null && !request.getUploadId().isBlank(); + } + + private boolean hasUploadId(CsvImportRequest request) { + return request != null && request.getUploadId() != null && !request.getUploadId().isBlank(); + } + + private List<CsvImportValidationMessage> validatePreviewConfiguration(CsvImportPreviewRequest request) { var messages = new ArrayList<CsvImportValidationMessage>(); if (request == null) { messages.add(message("request", "Import request must be provided.")); return messages; } + validateCsvConfig(request.getCsvConfig(), messages); + return messages; + } + + private List<CsvImportValidationMessage> validatePreviewRequest(CsvImportPreviewRequest request) { + var messages = validatePreviewConfiguration(request); + if (request == null) { + return messages; + } if (request.getHeaders() == null || request.getHeaders().isEmpty()) { messages.add(message("headers", "At least one header must be provided.")); } @@ -183,14 +437,12 @@ public class CsvDataLakeImportService { messages.add(message("rows", "At least one row must be provided.")); } validateRowsMatchHeaders(request.getHeaders(), request.getRows(), messages); - validateCsvConfig(request.getCsvConfig(), messages); return messages; } - private List<CsvImportValidationMessage> validateImportRequest(CsvImportRequest request) { - var messages = new ArrayList<CsvImportValidationMessage>(); + private List<CsvImportValidationMessage> validateInlineImportRequest(CsvImportRequest request) { + var messages = validateStoredImportRequest(request); if (request == null) { - messages.add(message("request", "Import request must be provided.")); return messages; } if (request.getHeaders() == null || request.getHeaders().isEmpty()) { @@ -200,6 +452,15 @@ public class CsvDataLakeImportService { messages.add(message("rows", "At least one row must be provided.")); } validateRowsMatchHeaders(request.getHeaders(), request.getRows(), messages); + return messages; + } + + private List<CsvImportValidationMessage> validateStoredImportRequest(CsvImportRequest request) { + var messages = new ArrayList<CsvImportValidationMessage>(); + if (request == null) { + messages.add(message("request", "Import request must be provided.")); + return messages; + } validateCsvConfig(request.getCsvConfig(), messages); if (request.getTarget() == null || request.getTarget().getMode() == null) { @@ -211,6 +472,11 @@ public class CsvDataLakeImportService { if (request.getColumns() == null || request.getColumns().isEmpty()) { messages.add(message("columns", "Column configuration must be provided.")); } + if (!hasUploadId(request) + && (request.getRows() == null || request.getRows().isEmpty()) + && (request.getHeaders() == null || request.getHeaders().isEmpty())) { + messages.add(message("uploadId", "Either an uploadId or inline CSV rows must be provided.")); + } return messages; } @@ -397,8 +663,7 @@ public class CsvDataLakeImportService { continue; } - if (!Objects.equals(getRuntimeType(entry.getValue()), getRuntimeType(imported)) - ) { + if (!Objects.equals(getRuntimeType(entry.getValue()), getRuntimeType(imported))) { issues.add(issue( CsvImportSchemaIssueType.COLUMN_TYPE_MISMATCH, entry.getKey(), @@ -427,28 +692,6 @@ public class CsvDataLakeImportService { ))); } - private SpQueryResult toQueryResult(CsvImportRequest request) { - var headers = request.getColumns().stream() - .map(CsvImportColumn::getRuntimeName) - .collect(Collectors.toList()); - var rows = new ArrayList<List<Object>>(); - for (var row : request.getRows()) { - var converted = new ArrayList<Object>(); - for (int i = 0; i < row.size(); i++) { - converted.add(convertValue(row.get(i), request.getColumns().get(i), request.getCsvConfig(), request.getTimestampColumn())); - } - rows.add(converted); - } - - var dataSeries = DataSeriesBuilder.create() - .withHeaders(headers) - .withRows(rows) - .build(); - return SpQueryResultBuilder.create(headers) - .withDataSeries(dataSeries) - .build(); - } - private EventSchema buildEventSchema( List<CsvImportColumn> columns, List<List<String>> rows, @@ -555,7 +798,6 @@ public class CsvDataLakeImportService { var allBoolean = true; var allLong = true; var allNumber = true; - var timestampCandidate = false; for (var row : rows) { var value = row.get(columnIndex); @@ -573,7 +815,7 @@ public class CsvDataLakeImportService { } } - timestampCandidate = isTimestampCandidate(rows, columnIndex, config); + var timestampCandidate = isTimestampCandidate(rows, columnIndex, config); if (timestampCandidate || allLong) { return "LONG"; @@ -621,21 +863,53 @@ public class CsvDataLakeImportService { CsvImportColumn column, CsvImportConfiguration config, String timestampColumn + ) { + return convertValue(rawValue, column, config, timestampColumn, null); + } + + private Object convertValue( + String rawValue, + CsvImportColumn column, + CsvImportConfiguration config, + String timestampColumn, + Integer rowNumber ) { if (rawValue == null || rawValue.isBlank()) { + if (rowNumber != null && Objects.equals(column.getRuntimeName(), timestampColumn)) { + throw new CsvImportValidationException(List.of( + message( + "rows", + "Row " + rowNumber + " is missing a value for timestamp column \"" + column.getCsvColumn() + "\"." + ) + )); + } return null; } + var trimmed = rawValue.trim(); - if (Objects.equals(column.getRuntimeName(), timestampColumn)) { - return parseTimestamp(trimmed, config); - } + try { + if (Objects.equals(column.getRuntimeName(), timestampColumn)) { + return parseTimestamp(trimmed, config); + } - return switch (finalRuntimeType(column, timestampColumn)) { - case "BOOLEAN" -> Boolean.parseBoolean(trimmed.toLowerCase(Locale.ENGLISH)); - case "LONG" -> Long.parseLong(normalizeNumber(trimmed, config)); - case "FLOAT" -> Double.parseDouble(normalizeNumber(trimmed, config)); - default -> trimmed; - }; + return switch (finalRuntimeType(column, timestampColumn)) { + case "BOOLEAN" -> Boolean.parseBoolean(trimmed.toLowerCase(Locale.ENGLISH)); + case "LONG" -> Long.parseLong(normalizeNumber(trimmed, config)); + case "FLOAT" -> Double.parseDouble(normalizeNumber(trimmed, config)); + default -> trimmed; + }; + } catch (RuntimeException e) { + if (rowNumber == null) { + throw e; + } + + throw new CsvImportValidationException(List.of( + message( + "rows", + "Row " + rowNumber + " contains an invalid value for column \"" + column.getCsvColumn() + "\"." + ) + )); + } } private long parseTimestamp(String value, CsvImportConfiguration config) { @@ -736,14 +1010,190 @@ public class CsvDataLakeImportService { return new CsvImportSchemaIssue(type, columnName, expected, actual); } - private String normalize(String value) { - return value == null ? null : value.trim().toLowerCase(Locale.ENGLISH); - } - private String getRuntimeType(EventProperty property) { if (property instanceof EventPropertyPrimitive primitive) { return primitive.getRuntimeType(); } return null; } + + private CsvFileSample readCsvSample(Path path, CsvImportConfiguration config, int maxRows) throws IOException { + var headers = new ArrayList<String>(); + var rows = new ArrayList<List<String>>(); + + parseCsvFile(path, config, new CsvRowConsumer() { + @Override + public void onHeaders(List<String> parsedHeaders) { + headers.addAll(parsedHeaders); + } + + @Override + public void onRow(int rowNumber, List<String> row) { + if (row.size() != headers.size()) { + throw new CsvImportValidationException(List.of( + message("rows", "Row " + rowNumber + " does not match the header size.") + )); + } + if (rows.size() < maxRows) { + rows.add(row); + } + } + }); + + if (headers.isEmpty()) { + throw new CsvImportValidationException(List.of(message("headers", "At least one header must be provided."))); + } + if (rows.isEmpty()) { + throw new CsvImportValidationException(List.of(message("rows", "At least one row must be provided."))); + } + + return new CsvFileSample(headers, rows); + } + + private void parseCsvFile(Path path, CsvImportConfiguration config, CsvRowConsumer consumer) throws IOException { + try (var reader = new PushbackReader(Files.newBufferedReader(path, StandardCharsets.UTF_8), 1)) { + var delimiter = normalizeDelimiter(config == null ? null : config.getDelimiter()); + var hasHeader = config == null || config.isHasHeader(); + List<String> headers = null; + int rowNumber = 0; + List<String> row; + + while ((row = readNextRow(reader, delimiter)) != null) { + if (isBlankRow(row)) { + continue; + } + + if (headers == null) { + if (hasHeader) { + headers = normalizeHeaders(row); + consumer.onHeaders(headers); + } else { + headers = generateHeaders(row.size()); + consumer.onHeaders(headers); + rowNumber += 1; + consumer.onRow(rowNumber, row); + } + } else { + rowNumber += 1; + consumer.onRow(rowNumber, row); + } + } + } + } + + private List<String> readNextRow(PushbackReader reader, char delimiter) throws IOException { + var currentRow = new ArrayList<String>(); + var currentValue = new StringBuilder(); + var inQuotes = false; + var readAny = false; + + while (true) { + var nextInt = reader.read(); + if (nextInt == -1) { + if (!readAny && currentValue.length() == 0 && currentRow.isEmpty()) { + return null; + } + currentRow.add(currentValue.toString()); + return currentRow; + } + + readAny = true; + var currentChar = (char) nextInt; + if (currentChar == '"') { + if (inQuotes) { + var escapedCandidate = reader.read(); + if (escapedCandidate == '"') { + currentValue.append('"'); + } else { + inQuotes = false; + if (escapedCandidate != -1) { + reader.unread(escapedCandidate); + } + } + } else { + inQuotes = true; + } + } else if (!inQuotes && currentChar == delimiter) { + currentRow.add(currentValue.toString()); + currentValue = new StringBuilder(); + } else if (!inQuotes && (currentChar == '\n' || currentChar == '\r')) { + if (currentChar == '\r') { + var maybeLineFeed = reader.read(); + if (maybeLineFeed != '\n' && maybeLineFeed != -1) { + reader.unread(maybeLineFeed); + } + } + currentRow.add(currentValue.toString()); + return currentRow; + } else { + currentValue.append(currentChar); + } + } + } + + private char normalizeDelimiter(String delimiter) { + if (delimiter == null || delimiter.isEmpty()) { + return ','; + } + if ("\\t".equals(delimiter)) { + return '\t'; + } + return delimiter.charAt(0); + } + + private List<String> normalizeHeaders(List<String> headers) { + var normalized = new ArrayList<String>(); + for (int i = 0; i < headers.size(); i++) { + var value = headers.get(i); + if (i == 0) { + value = stripBom(value); + } + var trimmed = value == null ? "" : value.trim(); + normalized.add(trimmed.isEmpty() ? "column_" + (i + 1) : trimmed); + } + return normalized; + } + + private String stripBom(String value) { + return value == null ? null : value.replace("\uFEFF", ""); + } + + private List<String> generateHeaders(int size) { + var headers = new ArrayList<String>(); + for (int i = 0; i < size; i++) { + headers.add("column_" + (i + 1)); + } + return headers; + } + + private boolean isBlankRow(List<String> row) { + return row.stream().allMatch(cell -> cell == null || cell.trim().isEmpty()); + } + + @FunctionalInterface + private interface CsvRowConsumer { + default void onHeaders(List<String> headers) { + } + + void onRow(int rowNumber, List<String> row); + } + + private static final class CsvFileSample { + + private final List<String> headers; + private final List<List<String>> rows; + + private CsvFileSample(List<String> headers, List<List<String>> rows) { + this.headers = headers; + this.rows = rows; + } + + List<String> headers() { + return headers; + } + + List<List<String>> rows() { + return rows; + } + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java new file mode 100644 index 0000000000..0651034f72 --- /dev/null +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl.datalake; + +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +class CsvImportUploadStorage { + + private static final Duration DEFAULT_TTL = Duration.ofHours(12); + + private final ConcurrentMap<String, StoredUpload> uploads; + private final Duration ttl; + + CsvImportUploadStorage() { + this(DEFAULT_TTL); + } + + CsvImportUploadStorage(Duration ttl) { + this.uploads = new ConcurrentHashMap<>(); + this.ttl = ttl; + } + + StoredUpload store(MultipartFile file, String ownerSid) throws IOException { + cleanupExpired(); + + var uploadId = UUID.randomUUID().toString(); + var tempFile = Files.createTempFile("streampipes-csv-import-", ".csv"); + try (var inputStream = file.getInputStream()) { + Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING); + } + + var upload = new StoredUpload( + uploadId, + tempFile, + ownerSid, + Instant.now() + ); + uploads.put(uploadId, upload); + return upload; + } + + Optional<StoredUpload> get(String uploadId) { + cleanupExpired(); + return Optional.ofNullable(uploads.get(uploadId)); + } + + void remove(String uploadId) { + var removed = uploads.remove(uploadId); + if (removed != null) { + deleteQuietly(removed.path()); + } + } + + private void cleanupExpired() { + var expiresBefore = Instant.now().minus(ttl); + uploads.entrySet().removeIf(entry -> { + var expired = entry.getValue().createdAt().isBefore(expiresBefore); + if (expired) { + deleteQuietly(entry.getValue().path()); + } + return expired; + }); + } + + private void deleteQuietly(Path path) { + try { + Files.deleteIfExists(path); + } catch (IOException ignored) { + // Best-effort cleanup of temporary uploads. + } + } + + static final class StoredUpload { + + private final String uploadId; + private final Path path; + private final String ownerSid; + private final Instant createdAt; + + StoredUpload(String uploadId, Path path, String ownerSid, Instant createdAt) { + this.uploadId = uploadId; + this.path = path; + this.ownerSid = ownerSid; + this.createdAt = createdAt; + } + + String uploadId() { + return uploadId; + } + + Path path() { + return path; + } + + String ownerSid() { + return ownerSid; + } + + Instant createdAt() { + return createdAt; + } + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java index a722b28484..c841e88a34 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java @@ -31,17 +31,24 @@ import org.apache.streampipes.storage.management.StorageDispatcher; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.IntStream; public class DataLakeDataWriter { private final boolean ignoreSchemaMismatch; + private final boolean allowMissingFields; public DataLakeDataWriter(boolean ignoreSchemaMismatch) { + this(ignoreSchemaMismatch, false); + } + + public DataLakeDataWriter(boolean ignoreSchemaMismatch, boolean allowMissingFields) { this.ignoreSchemaMismatch = ignoreSchemaMismatch; + this.allowMissingFields = allowMissingFields; } public void writeData(String measureName, SpQueryResult queryResult) { @@ -57,6 +64,14 @@ public class DataLakeDataWriter { getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure); } + public void writeData(DataLakeMeasure measure, List<String> headers, List<List<Object>> rows) { + var dataSeries = new DataSeries(); + dataSeries.setHeaders(headers); + dataSeries.setRows(rows); + dataSeries.setTotal(rows.size()); + getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure); + } + private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries, DataLakeMeasure measure){ var timeSeriesStore = getTimeSeriesStore(measure); @@ -99,13 +114,24 @@ public class DataLakeDataWriter { .collect(Collectors.toSet()); var runtimeNameSet = new HashSet<>(runtimeNames); - if (!runtimeNameSet.equals(strippedEventKeys)){ + if (!matchesRuntimeNames(runtimeNameSet, strippedEventKeys, allowMissingFields)) { throw new SpRuntimeException("The fields of the event do not match. Use \"ignoreSchemaMismatch\" to " + "ignore this error. Fields of the event: " + strippedEventKeys); } } } + static boolean matchesRuntimeNames( + Set<String> expectedRuntimeNames, + Set<String> actualRuntimeNames, + boolean allowMissingFields + ) { + if (allowMissingFields) { + return expectedRuntimeNames.containsAll(actualRuntimeNames); + } + return expectedRuntimeNames.equals(actualRuntimeNames); + } + private List<String> getRuntimeNames(DataLakeMeasure measure) { var runtimeNames = new ArrayList<String>(); runtimeNames.add(measure.getTimestampFieldName()); @@ -124,10 +150,18 @@ public class DataLakeDataWriter { } private Event rowToEvent(List<Object> row, List<String> headers){ - Map<String, Object> eventMap = IntStream.range(0, headers.size()) - .boxed() - .collect(Collectors.toMap(headers::get, row::get)); - return EventFactory.fromMap(eventMap); + return EventFactory.fromMap(toEventMap(row, headers)); + } + + static Map<String, Object> toEventMap(List<Object> row, List<String> headers) { + var eventMap = new LinkedHashMap<String, Object>(); + for (int i = 0; i < headers.size(); i++) { + var value = row.get(i); + if (value != null) { + eventMap.put(headers.get(i), value); + } + } + return eventMap; } private void renameTimestampField(Event event, String timestampField){ @@ -137,4 +171,3 @@ public class DataLakeDataWriter { } } - diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java index a5cebe0d44..d680e08fc3 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java @@ -33,7 +33,11 @@ import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestPart; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; @RestController @RequestMapping("/api/v4/datalake/import") @@ -53,13 +57,26 @@ public class DataLakeImportResource extends AbstractDataLakeResource { ) @PreAuthorize("this.hasWriteAuthority()") public ResponseEntity<CsvImportPreviewResult> preview(@RequestBody CsvImportPreviewRequest request) { - if (request.getTarget() != null - && request.getTarget().getMode() == CsvImportTargetMode.EXISTING - && this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent() - && !this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) { + if (!hasWritePermission(request.getTarget())) { + return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); + } + return ok(importService.preview(request, getAuthenticatedUserSid())); + } + + @PostMapping( + path = "/preview", + consumes = MediaType.MULTIPART_FORM_DATA_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE + ) + @PreAuthorize("this.hasWriteAuthority()") + public ResponseEntity<CsvImportPreviewResult> preview( + @RequestPart("file") MultipartFile file, + @RequestPart("request") CsvImportPreviewRequest request + ) throws IOException { + if (!hasWritePermission(request.getTarget())) { return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } - return ok(importService.preview(request)); + return ok(importService.preview(file, request, getAuthenticatedUserSid())); } @PostMapping( @@ -71,10 +88,7 @@ public class DataLakeImportResource extends AbstractDataLakeResource { public ResponseEntity<CsvImportSchemaValidationResult> validateSchema( @RequestBody CsvImportSchemaValidationRequest request ) { - if (request.getTarget() != null - && request.getTarget().getMode() == CsvImportTargetMode.EXISTING - && this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent() - && !this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) { + if (!hasWritePermission(request.getTarget())) { return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } return ok(importService.validateSchema(request)); @@ -86,10 +100,7 @@ public class DataLakeImportResource extends AbstractDataLakeResource { ) @PreAuthorize("this.hasWriteAuthority()") public ResponseEntity<CsvImportResult> importData(@RequestBody CsvImportRequest request) { - if (request.getTarget() != null - && request.getTarget().getMode() == CsvImportTargetMode.EXISTING - && this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent() - && !this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) { + if (!hasWritePermission(request.getTarget())) { return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } @@ -101,4 +112,11 @@ public class DataLakeImportResource extends AbstractDataLakeResource { return ResponseEntity.badRequest().body(result); } } + + private boolean hasWritePermission(org.apache.streampipes.model.datalake.importer.CsvImportTarget target) { + return target == null + || target.getMode() != CsvImportTargetMode.EXISTING + || this.dataLakeMeasureManagement.getExistingMeasureByName(target.getMeasurementName()).isEmpty() + || this.checkPermissionByName(target.getMeasurementName(), "WRITE"); + } } diff --git a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java index 9224094a7f..c8a9a3283a 100644 --- a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java +++ b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java @@ -20,6 +20,7 @@ package org.apache.streampipes.rest.impl.datalake; import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement; import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.apache.streampipes.model.datalake.SpQueryResult; import org.apache.streampipes.model.datalake.importer.CsvImportColumn; import org.apache.streampipes.model.datalake.importer.CsvImportConfiguration; import org.apache.streampipes.model.datalake.importer.CsvImportPreviewRequest; @@ -34,7 +35,9 @@ import org.apache.streampipes.vocabulary.SO; import org.apache.streampipes.vocabulary.XSD; import org.junit.jupiter.api.Test; +import org.springframework.web.multipart.MultipartFile; +import java.io.ByteArrayInputStream; import java.util.List; import java.util.Optional; @@ -43,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -118,7 +122,102 @@ class CsvDataLakeImportServiceTest { assertTrue(result.isCreatedNewMeasurement()); assertEquals(2, result.getImportedRowCount()); assertEquals("new-measure", result.getMeasurementName()); - verify(dataWriter).writeData(any(DataLakeMeasure.class), any()); + verify(dataWriter).writeData(any(DataLakeMeasure.class), any(SpQueryResult.class)); + } + + @Test + void shouldPreviewOnceAndImportFromStoredUpload() throws Exception { + var schemaManagement = mock(IDataExplorerSchemaManagement.class); + var dataWriter = mock(DataLakeDataWriter.class); + var uploadStorage = new CsvImportUploadStorage(); + var service = new CsvDataLakeImportService(schemaManagement, dataWriter, uploadStorage); + + when(schemaManagement.getExistingMeasureByName("uploaded-measure")) + .thenReturn(Optional.empty()); + when(schemaManagement.createOrUpdateMeasurement(any(DataLakeMeasure.class), any())) + .thenAnswer(invocation -> { + var measure = invocation.getArgument(0, DataLakeMeasure.class); + measure.setElementId("measure-id"); + return measure; + }); + + var previewRequest = new CsvImportPreviewRequest(); + previewRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter()); + previewRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, "uploaded-measure")); + + var multipartFile = mock(MultipartFile.class); + when(multipartFile.getOriginalFilename()).thenReturn("upload.csv"); + when(multipartFile.getInputStream()).thenReturn(new ByteArrayInputStream( + "timestamp,temperature\n1710000000000,21.3\n1710000060000,22.1\n".getBytes() + )); + + var previewResult = service.preview( + multipartFile, + previewRequest, + "sid" + ); + + assertTrue(previewResult.isValid()); + assertEquals(2, previewResult.getPreviewRows().size()); + assertTrue(previewResult.getUploadId() != null && !previewResult.getUploadId().isBlank()); + + var importRequest = new CsvImportRequest(); + importRequest.setUploadId(previewResult.getUploadId()); + importRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter()); + importRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, "uploaded-measure")); + importRequest.setTimestampColumn("timestamp"); + importRequest.setColumns(previewResult.getColumns()); + + var importResult = service.importData(importRequest, "sid"); + + assertTrue(importResult.isCreatedNewMeasurement()); + assertEquals(2, importResult.getImportedRowCount()); + verify(dataWriter).writeData(any(DataLakeMeasure.class), anyList(), anyList()); + } + + @Test + void shouldRejectMissingTimestampValuesInUploadedCsv() throws Exception { + var schemaManagement = mock(IDataExplorerSchemaManagement.class); + var dataWriter = mock(DataLakeDataWriter.class); + var uploadStorage = new CsvImportUploadStorage(); + var service = new CsvDataLakeImportService(schemaManagement, dataWriter, uploadStorage); + + when(schemaManagement.getExistingMeasureByName("uploaded-measure")) + .thenReturn(Optional.empty()); + when(schemaManagement.createOrUpdateMeasurement(any(DataLakeMeasure.class), any())) + .thenAnswer(invocation -> { + var measure = invocation.getArgument(0, DataLakeMeasure.class); + measure.setElementId("measure-id"); + return measure; + }); + + var previewRequest = new CsvImportPreviewRequest(); + previewRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter()); + previewRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, "uploaded-measure")); + + var multipartFile = mock(MultipartFile.class); + when(multipartFile.getOriginalFilename()).thenReturn("upload.csv"); + when(multipartFile.getInputStream()).thenReturn(new ByteArrayInputStream( + "timestamp,temperature\n1710000000000,21.3\n,22.1\n".getBytes() + )); + + var previewResult = service.preview(multipartFile, previewRequest, "sid"); + + var importRequest = new CsvImportRequest(); + importRequest.setUploadId(previewResult.getUploadId()); + importRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter()); + importRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, "uploaded-measure")); + importRequest.setTimestampColumn("timestamp"); + importRequest.setColumns(previewResult.getColumns()); + + var exception = assertThrows( + CsvImportValidationException.class, + () -> service.importData(importRequest, "sid") + ); + + assertTrue(exception.getValidationMessages() + .stream() + .anyMatch(message -> message.getMessage().contains("missing a value for timestamp column"))); } @Test @@ -202,6 +301,14 @@ class CsvDataLakeImportServiceTest { return config; } + private CsvImportConfiguration makeCsvConfigWithCommaDelimiter() { + var config = new CsvImportConfiguration(); + config.setDelimiter(","); + config.setDecimalSeparator("."); + config.setHasHeader(true); + return config; + } + private EventSchema makeExistingSchema() { var timestamp = new EventPropertyPrimitive(); timestamp.setRuntimeName("timestamp"); diff --git a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java new file mode 100644 index 0000000000..ebeee70cfb --- /dev/null +++ b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl.datalake; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DataLakeDataWriterTest { + + @Test + void shouldSkipNullValuesWhenBuildingEventMap() { + var headers = List.of("timestamp", "temperature", "status"); + List<Object> row = new java.util.ArrayList<>(); + row.add(1710000000000L); + row.add(null); + row.add("ok"); + + var eventMap = DataLakeDataWriter.toEventMap(row, headers); + + assertEquals(2, eventMap.size()); + assertEquals(1710000000000L, eventMap.get("timestamp")); + assertEquals("ok", eventMap.get("status")); + assertFalse(eventMap.containsKey("temperature")); + } + + @Test + void shouldAllowMissingFieldsWhenConfigured() { + var expected = new HashSet<>(List.of("timestamp", "temperature", "status")); + var actual = new HashSet<>(List.of("timestamp", "status")); + + assertTrue(DataLakeDataWriter.matchesRuntimeNames(expected, actual, true)); + assertFalse(DataLakeDataWriter.matchesRuntimeNames(expected, actual, false)); + } +} diff --git a/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv b/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv new file mode 100644 index 0000000000..8c3c0cce83 --- /dev/null +++ b/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv @@ -0,0 +1,8 @@ +timestamp,dmc,druckoelraumAnlagenkompensation,druckoelraumBauteiltemperatur1,druckoelraumDifferenztemperatur,druckoelraumDrehzahl,druckoelraumDruck +1772722801725,0.0,0.0,,0.0,0.0,0.0 +1772722802743,0,0.0,0.0,0.0,0.0,0.0 +1772722803756,0.0,,0.0,0.0,0.0,0.0 +1772722804768,0.0,0.0,,0.0,0.0,0.0 +1772722805777,0.0,0.0,,,0.0,0.0 +1772722806784,0.0,0.0,0.0,0.0,,0.0 +1772722807791,0.0,0.0,0.0,0.0,0.0,
