Copilot commented on code in PR #9862: URL: https://github.com/apache/seatunnel/pull/9862#discussion_r2347892848
########## seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/tikadocument/extractor/TikaDocumentExtractor.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.tikadocument.extractor; + +import org.apache.tika.detect.DefaultDetector; +import org.apache.tika.detect.Detector; +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.mime.MediaType; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.BodyContentHandler; +import org.apache.tika.sax.WriteOutContentHandler; + +import org.xml.sax.SAXException; + +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +/** Apache Tika implementation of DocumentExtractor */ +@Slf4j +public class TikaDocumentExtractor implements DocumentExtractor { + + private final Parser parser; + private final Detector detector; + private long timeoutMs = 30000; // 30 seconds default timeout + + // Supported MIME types for first phase (MVP) + private static final Set<String> SUPPORTED_MIME_TYPES = + new HashSet<String>() { + { + add("application/pdf"); + add("application/msword"); + add("application/vnd.openxmlformats-officedocument.wordprocessingml.document"); + add("application/vnd.ms-excel"); + add("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"); + add("application/vnd.ms-powerpoint"); + add( + "application/vnd.openxmlformats-officedocument.presentationml.presentation"); + add("text/plain"); + add("text/html"); + add("application/rtf"); + } + }; Review Comment: [nitpick] Use modern collection initialization syntax instead of anonymous inner class. Replace with `Set.of()` or `Arrays.asList()` wrapped in `new HashSet<>()` for better readability and performance. ########## seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/tikadocument/extractor/TikaDocumentExtractor.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.tikadocument.extractor; + +import org.apache.tika.detect.DefaultDetector; +import org.apache.tika.detect.Detector; +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.mime.MediaType; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.BodyContentHandler; +import org.apache.tika.sax.WriteOutContentHandler; + +import org.xml.sax.SAXException; + +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +/** Apache Tika implementation of DocumentExtractor */ +@Slf4j +public class TikaDocumentExtractor implements DocumentExtractor { + + private final Parser parser; + private final Detector detector; + private long timeoutMs = 30000; // 30 seconds default timeout + + // Supported MIME types for first phase (MVP) + private static final Set<String> SUPPORTED_MIME_TYPES = + new HashSet<String>() { + { + add("application/pdf"); + add("application/msword"); + add("application/vnd.openxmlformats-officedocument.wordprocessingml.document"); + add("application/vnd.ms-excel"); + add("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"); + add("application/vnd.ms-powerpoint"); + add( + "application/vnd.openxmlformats-officedocument.presentationml.presentation"); + add("text/plain"); + add("text/html"); + add("application/rtf"); + } + }; + + public TikaDocumentExtractor() { + this.parser = new AutoDetectParser(); + this.detector = new DefaultDetector(); + } + + @Override + public DocumentMetadata extract( + byte[] documentData, + boolean extractText, + boolean extractMetadata, + int maxStringLength) { + try (InputStream inputStream = new ByteArrayInputStream(documentData)) { + return extract(inputStream, extractText, extractMetadata, maxStringLength); + } catch (IOException e) { + log.error("Error creating input stream from document data", e); + return DocumentMetadata.builder() + .successful(false) + .errorMessage("Failed to create input stream: " + e.getMessage()) + .build(); + } + } + + @Override + public DocumentMetadata extract( + InputStream inputStream, + boolean extractText, + boolean extractMetadata, + int maxStringLength) { + DocumentMetadata.DocumentMetadataBuilder builder = DocumentMetadata.builder(); + + try (TikaInputStream tikaInputStream = TikaInputStream.get(inputStream)) { + Metadata metadata = new Metadata(); + ParseContext parseContext = new ParseContext(); + parseContext.set(Parser.class, parser); + + String content = null; + + // Extract text content if requested + if (extractText) { + BodyContentHandler handler = + new BodyContentHandler(new WriteOutContentHandler(maxStringLength)); + + long startTime = System.currentTimeMillis(); + parser.parse(tikaInputStream, handler, metadata, parseContext); + long parseTime = System.currentTimeMillis() - startTime; + + if (parseTime > timeoutMs) { + log.warn( + "Document parsing took {} ms, which exceeds timeout of {} ms", + parseTime, + timeoutMs); + } + + content = handler.toString(); + } else { + // Parse only for metadata + parser.parse(tikaInputStream, new BodyContentHandler(-1), metadata, parseContext); + } + + // Build metadata object + builder.successful(true).content(content); + + if (extractMetadata) { + populateMetadata(builder, metadata); + } + + // Set file size + try { + tikaInputStream.reset(); + builder.fileSize((long) tikaInputStream.available()); + } catch (IOException e) { + log.debug("Could not determine file size", e); + } + + } catch (TikaException e) { + log.error("Tika parsing error", e); + return builder.successful(false) + .errorMessage("Tika parsing failed: " + e.getMessage()) + .build(); + } catch (IOException e) { + log.error("IO error during document parsing", e); + return builder.successful(false).errorMessage("IO error: " + e.getMessage()).build(); + } catch (SAXException e) { + log.error("SAX parsing error", e); + return builder.successful(false) + .errorMessage("SAX parsing failed: " + e.getMessage()) + .build(); + } catch (Exception e) { + log.error("Unexpected error during document parsing", e); + return builder.successful(false) + .errorMessage("Unexpected error: " + e.getMessage()) + .build(); + } + + return builder.build(); + } + + private void populateMetadata( + DocumentMetadata.DocumentMetadataBuilder builder, Metadata metadata) { + // Basic metadata + builder.title(metadata.get(TikaCoreProperties.TITLE)) + .author(metadata.get(TikaCoreProperties.CREATOR)) + .subject(metadata.get(TikaCoreProperties.SUBJECT)) + .keywords(metadata.get("Keywords")) // Use string key for keywords + .contentType(metadata.get(Metadata.CONTENT_TYPE)) + .language(metadata.get(TikaCoreProperties.LANGUAGE)); + + // Dates + Date creationDate = metadata.getDate(TikaCoreProperties.CREATED); + if (creationDate != null) { + builder.creationDate(creationDate); + } + + Date modificationDate = metadata.getDate(TikaCoreProperties.MODIFIED); + if (modificationDate != null) { + builder.modificationDate(modificationDate); + } + + // Page count (for PDFs and other documents that support it) + String pageCountStr = metadata.get("xmpTPg:NPages"); + if (pageCountStr == null) { + pageCountStr = metadata.get("meta:page-count"); + } + if (pageCountStr != null) { + try { + builder.pageCount(Integer.parseInt(pageCountStr)); + } catch (NumberFormatException e) { + log.debug("Could not parse page count: {}", pageCountStr); + } + } + + // Add all other metadata as custom metadata + DocumentMetadata tempMetadata = builder.build(); + for (String name : metadata.names()) { + if (!isStandardMetadata(name)) { + tempMetadata.setCustomMetadata(name, metadata.get(name)); + } + } Review Comment: The DocumentMetadata object is built prematurely and then modified, but the builder pattern suggests immutability. The tempMetadata object should not be used to set custom metadata after building. Consider collecting custom metadata before building the final object. ```suggestion // Collect all other metadata as custom metadata before building java.util.Map<String, String> customMetadata = new java.util.HashMap<>(); for (String name : metadata.names()) { if (!isStandardMetadata(name)) { customMetadata.put(name, metadata.get(name)); } } builder.customMetadata(customMetadata); ``` ########## seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/tikadocument/TikaDocumentTransformTest.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.tikadocument; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.transform.common.ErrorHandleWay; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** Unit tests for TikaDocumentTransform */ +public class TikaDocumentTransformTest { + + private CatalogTable catalogTable; + private TikaDocumentTransformConfig config; + + @BeforeEach + public void setUp() { + // Create test catalog table + TableSchema tableSchema = + TableSchema.builder() + .columns( + Arrays.asList( + PhysicalColumn.of( + "id", BasicType.LONG_TYPE, 0, false, null, ""), + PhysicalColumn.of( + "filename", + BasicType.STRING_TYPE, + 200, + true, + null, + ""), + PhysicalColumn.of( + "document_data", + BasicType.BYTE_ARRAY_TYPE, + 0, + true, + null, + ""))) + .build(); + + catalogTable = + CatalogTable.of( + TableIdentifier.of("test", "test_table"), + tableSchema, + new HashMap<>(), + Arrays.asList(), + ""); + + // Create test configuration + Map<String, Object> configMap = new HashMap<>(); + configMap.put("source_field", "document_data"); + + Map<String, String> outputFields = new HashMap<>(); + outputFields.put("content", "extracted_text"); + outputFields.put("content_type", "mime_type"); + outputFields.put("title", "doc_title"); + configMap.put("output_fields", outputFields); + + configMap.put("parse_options.extract_text", true); + configMap.put("parse_options.extract_metadata", true); + configMap.put("parse_options.max_string_length", 10000); + configMap.put("content_processing.remove_empty_lines", true); + configMap.put("content_processing.trim_whitespace", true); + configMap.put("error_handling.on_parse_error", "skip"); + configMap.put("error_handling.log_errors", true); + + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap); + config = TikaDocumentTransformConfig.of(readonlyConfig); + } + + @Test + public void testPluginName() { + TikaDocumentTransform transform = new TikaDocumentTransform(config, catalogTable); + Assertions.assertEquals("TikaDocument", transform.getPluginName()); + } + + @Test + public void testGetOutputColumns() { + TikaDocumentTransform transform = new TikaDocumentTransform(config, catalogTable); + Column[] outputColumns = transform.getOutputColumns(); + + Assertions.assertEquals(3, outputColumns.length); + + // Check column names + String[] expectedNames = {"extracted_text", "mime_type", "doc_title"}; + for (int i = 0; i < outputColumns.length; i++) { + boolean found = false; + for (String expectedName : expectedNames) { + if (expectedName.equals(outputColumns[i].getName())) { + found = true; + break; + } + } Review Comment: [nitpick] Use modern Java 8+ streams or convert expectedNames to a Set for more efficient lookup instead of nested loops. This could be replaced with `Set.of(expectedNames).contains(outputColumns[i].getName())`. ########## seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/tikadocument/TikaDocumentTransformConfig.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.tikadocument; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.transform.common.ErrorHandleWay; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Getter +@Setter +public class TikaDocumentTransformConfig implements Serializable { + + // Source field configuration + public static final Option<String> SOURCE_FIELD = + Options.key("source_field") + .stringType() + .noDefaultValue() + .withDescription( + "Source field name containing document data (byte[] or base64 string)"); + + // Output fields configuration + public static final Option<Map<String, String>> OUTPUT_FIELDS = + Options.key("output_fields") + .mapType() + .defaultValue( + new HashMap<String, String>() { + { + put("content", "extracted_text"); + put("content_type", "mime_type"); + } + }) Review Comment: [nitpick] Use modern Map initialization syntax instead of anonymous inner class. Replace with `Map.of()` for better readability and performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
