Copilot commented on code in PR #2655: URL: https://github.com/apache/tika/pull/2655#discussion_r2873737650
########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; + +/** + * Base class for Tika gRPC end-to-end tests. + * Can run with either local server (default in CI) or Docker Compose. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +public abstract class ExternalTestBase { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final int MAX_STARTUP_TIMEOUT = 120; + public static final String GOV_DOCS_FOLDER = "/tika/govdocs1"; + public static final File TEST_FOLDER = new File("target", "govdocs1"); + public static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + public static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + public static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + public static DockerComposeContainer<?> composeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setup() throws Exception { + loadGovdocs1(); + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven exec"); + + Path tikaGrpcDir = findTikaGrpcDirectory(); + Path configFile = Path.of("src/test/resources/tika-config.json").toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + String javaHome = System.getProperty("java.home"); + boolean isWindows = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win"); + String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" : "/bin/java"); + String mvnCmd = isWindows ? "mvn.cmd" : "mvn"; + + ProcessBuilder pb = new ProcessBuilder( + mvnCmd, + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "-classpath %classpath " + + "org.apache.tika.pipes.grpc.TikaGrpcServer " + + "-c " + configFile + " " + + "-p " + GRPC_PORT + ); Review Comment: Local server startup shells out to `mvn`/`mvn.cmd`, which requires Maven to be installed and on PATH. Since this repo includes `mvnw`, using the Maven wrapper (resolved from the Tika root dir you already locate) would make local mode much more reliable across CI and developer machines. ########## tika-e2e-tests/README.md: ########## @@ -0,0 +1,59 @@ +# Apache Tika End-to-End Tests + +End-to-end integration tests for Apache Tika components. + +## Overview + +This module contains standalone end-to-end (E2E) tests for various Apache Tika distribution formats and deployment modes. Unlike unit and integration tests in the main Tika build, these E2E tests validate complete deployment scenarios using Docker containers and real-world test data. + +**Note:** This module is intentionally **NOT** included in the main Tika parent POM. It is designed to be built and run independently to avoid slowing down the primary build process. + Review Comment: This README states the E2E module is intentionally NOT included in the main build, but the root `pom.xml` now includes `tika-e2e-tests` as a module. Please update this note to match the actual build behavior (or keep it excluded from the root reactor). ########## tika-e2e-tests/tika-grpc/src/test/resources/tika-config-ignite-local.json: ########## @@ -0,0 +1,52 @@ +{ + "plugin-roots": ["/var/cache/tika/plugins"], + "pipes": { + "numClients": 1, + "configStoreType": "ignite", + "configStoreParams": "{\"tableName\": \"tika_e2e_test\", \"igniteInstanceName\": \"TikaE2ETest\", \"replicas\": 2, \"partitions\": 10, \"autoClose\": true}", Review Comment: `replicas` is set to 2 in the *local* Ignite config. In local server mode this is a single-node cluster, so requesting 2 replicas is likely to fail table creation or never fully satisfy assignments. Set `replicas` to 1 for `tika-config-ignite-local.json` (and keep higher replica counts only when the test actually brings up multiple Ignite nodes). ```suggestion "configStoreParams": "{\"tableName\": \"tika_e2e_test\", \"igniteInstanceName\": \"TikaE2ETest\", \"replicas\": 1, \"partitions\": 10, \"autoClose\": true}", ``` ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; + +/** + * Base class for Tika gRPC end-to-end tests. + * Can run with either local server (default in CI) or Docker Compose. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +public abstract class ExternalTestBase { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final int MAX_STARTUP_TIMEOUT = 120; + public static final String GOV_DOCS_FOLDER = "/tika/govdocs1"; + public static final File TEST_FOLDER = new File("target", "govdocs1"); + public static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + public static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + public static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); Review Comment: `USE_LOCAL_SERVER` defaults to `false` here, but the module POM sets `<tika.e2e.useLocalServer>true</tika.e2e.useLocalServer>` and the PR description says local mode is the default. Consider defaulting the system property to `true` in code as well (or centralize the default) so running a test directly in an IDE behaves the same as Maven. ```suggestion private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "true")); ``` ########## tika-e2e-tests/pom.xml: ########## @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.tika</groupId> + <artifactId>tika-parent</artifactId> + <version>${revision}</version> + <relativePath>../tika-parent/pom.xml</relativePath> + </parent> + + <artifactId>tika-e2e-tests</artifactId> + <packaging>pom</packaging> + <name>Apache Tika End-to-End Tests</name> + <description>End-to-end integration tests for Apache Tika components</description> + + <properties> + <maven.compiler.source>17</maven.compiler.source> + <maven.compiler.target>17</maven.compiler.target> + <maven.compiler.release>17</maven.compiler.release> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <!-- Tika version --> + <tika.version>4.0.0-SNAPSHOT</tika.version> Review Comment: `tika.version` is hard-coded to `4.0.0-SNAPSHOT` even though this module is part of the same multi-module build (which already defines `${revision}`). This will break when the project version changes or when building release candidates; prefer `${revision}`/`${project.version}` for intra-reactor alignment. ```suggestion <tika.version>${revision}</tika.version> ``` ########## pom.xml: ########## @@ -57,6 +57,7 @@ <module>tika-example</module> <module>tika-java7</module> <module>tika-handlers</module> + <module>tika-e2e-tests</module> Review Comment: This adds `tika-e2e-tests` to the root reactor, which means `mvn test`/`mvn verify` from the repo root will now run these E2E tests by default (including downloading GovDocs1 from the internet and potentially starting Docker/local servers). If these are meant to be opt-in, put this module behind a Maven profile or configure surefire to skip by default unless explicitly enabled. ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +/** + * End-to-end test for Ignite ConfigStore. + * Tests that fetchers saved via gRPC are persisted in Ignite + * and available in the forked PipesServer process. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and Docker/Testcontainers not supported on Windows CI") +class IgniteConfigStoreTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_STARTUP_TIMEOUT = 120; + private static final File TEST_FOLDER = new File("target", "govdocs1"); + private static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + private static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + private static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); Review Comment: This class reimplements a lot of logic that already exists in `ExternalTestBase` (GovDocs download/unzip, result assertions, gRPC channel creation, server startup switching). Consider extending `ExternalTestBase` and overriding only what’s Ignite-specific to reduce duplicated maintenance and keep behavior consistent across E2E tests. ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +/** + * End-to-end test for Ignite ConfigStore. + * Tests that fetchers saved via gRPC are persisted in Ignite + * and available in the forked PipesServer process. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and Docker/Testcontainers not supported on Windows CI") +class IgniteConfigStoreTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_STARTUP_TIMEOUT = 120; + private static final File TEST_FOLDER = new File("target", "govdocs1"); + private static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + private static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + private static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + private static DockerComposeContainer<?> igniteComposeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setupIgnite() throws Exception { + // Clean up any orphaned processes from previous runs + if (USE_LOCAL_SERVER) { + log.info("Cleaning up any orphaned processes from previous runs"); + try { + killProcessOnPort(GRPC_PORT); + killProcessOnPort(3344); + killProcessOnPort(10800); + } catch (Exception e) { + log.debug("No orphaned processes to clean up"); + } + } + + // Load govdocs1 if not already loaded + if (!TEST_FOLDER.exists() || TEST_FOLDER.listFiles().length == 0) { + downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX); + } + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven"); + + // Find the tika root directory - it should contain both tika-grpc and tika-e2e-tests + Path currentDir = Path.of("").toAbsolutePath(); + Path tikaRootDir = currentDir; + + // Navigate up to find the directory that contains both tika-grpc and tika-e2e-tests + while (tikaRootDir != null && + !(Files.exists(tikaRootDir.resolve("tika-grpc")) && + Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) { + tikaRootDir = tikaRootDir.getParent(); + } + + if (tikaRootDir == null) { + throw new IllegalStateException("Cannot find tika root directory. " + + "Current dir: " + currentDir + ". " + + "Please run from within the tika project."); + } + + Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc"); + if (!Files.exists(tikaGrpcDir)) { + throw new IllegalStateException("Cannot find tika-grpc directory at: " + tikaGrpcDir); + } + + // Use different config for local vs Docker + String configFileName = "tika-config-ignite-local.json"; + Path configFile = Path.of("src/test/resources/" + configFileName).toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Tika root: {}", tikaRootDir); + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + // Use mvn exec:exec to run as external process (not exec:java which breaks ServiceLoader) + String javaHome = System.getProperty("java.home"); + String javaCmd = javaHome + "/bin/java"; + + ProcessBuilder pb = new ProcessBuilder( + "mvn", + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " + Review Comment: Local server startup uses a hard-coded `mvn` command. This requires Maven on PATH and ignores the repo’s `mvnw` wrapper (and also differs from `ExternalTestBase`, which already has OS-specific handling). Consider using the Maven wrapper from the located Tika root directory for consistent, hermetic test execution. ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +/** + * End-to-end test for Ignite ConfigStore. + * Tests that fetchers saved via gRPC are persisted in Ignite + * and available in the forked PipesServer process. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and Docker/Testcontainers not supported on Windows CI") +class IgniteConfigStoreTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_STARTUP_TIMEOUT = 120; + private static final File TEST_FOLDER = new File("target", "govdocs1"); + private static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + private static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + private static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + private static DockerComposeContainer<?> igniteComposeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setupIgnite() throws Exception { + // Clean up any orphaned processes from previous runs + if (USE_LOCAL_SERVER) { + log.info("Cleaning up any orphaned processes from previous runs"); + try { + killProcessOnPort(GRPC_PORT); + killProcessOnPort(3344); + killProcessOnPort(10800); + } catch (Exception e) { + log.debug("No orphaned processes to clean up"); + } + } + + // Load govdocs1 if not already loaded + if (!TEST_FOLDER.exists() || TEST_FOLDER.listFiles().length == 0) { + downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX); + } + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven"); + + // Find the tika root directory - it should contain both tika-grpc and tika-e2e-tests + Path currentDir = Path.of("").toAbsolutePath(); + Path tikaRootDir = currentDir; + + // Navigate up to find the directory that contains both tika-grpc and tika-e2e-tests + while (tikaRootDir != null && + !(Files.exists(tikaRootDir.resolve("tika-grpc")) && + Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) { + tikaRootDir = tikaRootDir.getParent(); + } + + if (tikaRootDir == null) { + throw new IllegalStateException("Cannot find tika root directory. " + + "Current dir: " + currentDir + ". " + + "Please run from within the tika project."); + } + + Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc"); + if (!Files.exists(tikaGrpcDir)) { + throw new IllegalStateException("Cannot find tika-grpc directory at: " + tikaGrpcDir); + } + + // Use different config for local vs Docker + String configFileName = "tika-config-ignite-local.json"; + Path configFile = Path.of("src/test/resources/" + configFileName).toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Tika root: {}", tikaRootDir); + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + // Use mvn exec:exec to run as external process (not exec:java which breaks ServiceLoader) + String javaHome = System.getProperty("java.home"); + String javaCmd = javaHome + "/bin/java"; + + ProcessBuilder pb = new ProcessBuilder( + "mvn", + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.math=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED " + + "--add-opens=java.base/java.time=ALL-UNNAMED " + + "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED " + + "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED " + + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " + + "--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED " + + "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED " + + "-Dio.netty.tryReflectionSetAccessible=true " + + "-Dignite.work.dir=" + tikaGrpcDir.resolve("target/ignite-work") + " " + + "-classpath %classpath " + + "org.apache.tika.pipes.grpc.TikaGrpcServer " + + "-c " + configFile + " " + + "-p " + GRPC_PORT + ); + + pb.directory(tikaGrpcDir.toFile()); + pb.redirectErrorStream(true); + pb.redirectOutput(ProcessBuilder.Redirect.PIPE); + + localGrpcProcess = pb.start(); + + // Track whether Ignite has started + final boolean[] igniteStarted = {false}; + + // Start a thread to consume and log output, watching for Ignite startup + Thread logThread = new Thread(() -> { + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(localGrpcProcess.getInputStream(), java.nio.charset.StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("tika-grpc: {}", line); + + // Look for signs that Ignite has fully started + if (line.contains("Ignite server started") || + line.contains("Table") && line.contains("created successfully") || + line.contains("Server started, listening on")) { + synchronized (igniteStarted) { + igniteStarted[0] = true; + igniteStarted.notifyAll(); + } + } + } + } catch (IOException e) { + log.error("Error reading server output", e); + } + }); + logThread.setDaemon(true); + logThread.start(); + + // Wait for Ignite to start - check both log messages and gRPC connectivity + log.info("Waiting for local gRPC server and Ignite to start (timeout: 180 seconds)..."); + + try { + org.awaitility.Awaitility.await() + .atMost(java.time.Duration.ofSeconds(180)) + .pollInterval(java.time.Duration.ofSeconds(2)) + .until(() -> { + boolean igniteReady; + synchronized (igniteStarted) { + igniteReady = igniteStarted[0]; + } + + if (!igniteReady) { + log.debug("Waiting for Ignite to start..."); + return false; + } + + // Try to actually test gRPC readiness with a real (lightweight) call + try { + ManagedChannel testChannel = ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .build(); + + try { + // Try to use the health check service + io.grpc.health.v1.HealthGrpc.HealthBlockingStub healthStub = + io.grpc.health.v1.HealthGrpc.newBlockingStub(testChannel) + .withDeadlineAfter(2, TimeUnit.SECONDS); + + io.grpc.health.v1.HealthCheckResponse response = healthStub.check( + io.grpc.health.v1.HealthCheckRequest.getDefaultInstance()); + + boolean serving = response.getStatus() == + io.grpc.health.v1.HealthCheckResponse.ServingStatus.SERVING; + + if (serving) { + log.info("gRPC server is healthy and serving!"); + return true; + } else { + log.debug("gRPC server responding but not serving yet: {}", response.getStatus()); + return false; + } + } finally { + testChannel.shutdown(); + testChannel.awaitTermination(1, TimeUnit.SECONDS); + } + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.UNIMPLEMENTED) { + // Health check not implemented, just verify channel works + log.info("Health check not available, assuming server is ready"); + return true; + } + log.debug("gRPC server not ready yet: {}", e.getMessage()); + return false; + } catch (Exception e) { + log.debug("gRPC server not ready yet: {}", e.getMessage()); + return false; + } + }); + + log.info("Both gRPC server and Ignite are ready!"); + } catch (org.awaitility.core.ConditionTimeoutException e) { + if (localGrpcProcess.isAlive()) { + localGrpcProcess.destroyForcibly(); + } + throw new RuntimeException("Local gRPC server or Ignite failed to start within timeout", e); + } + + log.info("Local tika-grpc server started successfully on port {}", GRPC_PORT); + } + + + private static void startDockerGrpcServer() { + log.info("Starting Docker Compose tika-grpc server"); + + igniteComposeContainer = new DockerComposeContainer<>( + new File("src/test/resources/docker-compose-ignite.yml")) + .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) + .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) + .withExposedService("tika-grpc", 50052, + Wait.forLogMessage(".*Server started.*\\n", 1)) + .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log)); + + igniteComposeContainer.start(); + Review Comment: This test references `src/test/resources/docker-compose-ignite.yml`, but there is no such file in this module/repo. Docker mode will fail immediately unless the compose file is added or the path updated. ```suggestion URL composeFileUrl = IgniteConfigStoreTest.class.getClassLoader() .getResource("docker-compose-ignite.yml"); if (composeFileUrl == null) { throw new IllegalStateException( "Could not find 'docker-compose-ignite.yml' on the test classpath."); } igniteComposeContainer = new DockerComposeContainer<>( new File(composeFileUrl.getFile())) .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) .withExposedService("tika-grpc", 50052, Wait.forLogMessage(".*Server started.*\\n", 1)) .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log)); igniteComposeContainer.start(); ``` ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +/** + * End-to-end test for Ignite ConfigStore. + * Tests that fetchers saved via gRPC are persisted in Ignite + * and available in the forked PipesServer process. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and Docker/Testcontainers not supported on Windows CI") +class IgniteConfigStoreTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_STARTUP_TIMEOUT = 120; + private static final File TEST_FOLDER = new File("target", "govdocs1"); + private static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + private static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + private static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); Review Comment: `USE_LOCAL_SERVER` defaults to `false` here, which conflicts with the module default (`tika.e2e.useLocalServer=true`) and the PR description (local mode is default). This makes the test try Docker mode unless the property is explicitly set, which is surprising when running from an IDE. ```suggestion private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "true")); ``` ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; + +/** + * Base class for Tika gRPC end-to-end tests. + * Can run with either local server (default in CI) or Docker Compose. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +public abstract class ExternalTestBase { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final int MAX_STARTUP_TIMEOUT = 120; + public static final String GOV_DOCS_FOLDER = "/tika/govdocs1"; + public static final File TEST_FOLDER = new File("target", "govdocs1"); + public static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + public static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + public static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + public static DockerComposeContainer<?> composeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setup() throws Exception { + loadGovdocs1(); + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven exec"); + + Path tikaGrpcDir = findTikaGrpcDirectory(); + Path configFile = Path.of("src/test/resources/tika-config.json").toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + String javaHome = System.getProperty("java.home"); + boolean isWindows = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win"); + String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" : "/bin/java"); + String mvnCmd = isWindows ? "mvn.cmd" : "mvn"; + + ProcessBuilder pb = new ProcessBuilder( + mvnCmd, + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "-classpath %classpath " + + "org.apache.tika.pipes.grpc.TikaGrpcServer " + + "-c " + configFile + " " + + "-p " + GRPC_PORT + ); + + pb.directory(tikaGrpcDir.toFile()); + pb.redirectErrorStream(true); + pb.redirectOutput(ProcessBuilder.Redirect.PIPE); + + localGrpcProcess = pb.start(); + + // Start thread to consume output + Thread logThread = new Thread(() -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(localGrpcProcess.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("tika-grpc: {}", line); + } + } catch (IOException e) { + log.error("Error reading server output", e); + } + }); + logThread.setDaemon(true); + logThread.start(); + + // Wait for server to be ready + waitForServerReady(); + + log.info("Local tika-grpc server started successfully on port {}", GRPC_PORT); + } + + private static Path findTikaGrpcDirectory() { + Path currentDir = Path.of("").toAbsolutePath(); + Path tikaRootDir = currentDir; + + while (tikaRootDir != null && + !(Files.exists(tikaRootDir.resolve("tika-grpc")) && + Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) { + tikaRootDir = tikaRootDir.getParent(); + } + + if (tikaRootDir == null) { + throw new IllegalStateException("Cannot find tika root directory. " + + "Current dir: " + currentDir); + } + + return tikaRootDir.resolve("tika-grpc"); + } + + private static void waitForServerReady() throws Exception { + int maxAttempts = 60; + for (int i = 0; i < maxAttempts; i++) { + try { + ManagedChannel testChannel = ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .build(); + + try { + // Try a simple connection + testChannel.getState(true); + TimeUnit.MILLISECONDS.sleep(100); + if (testChannel.getState(false).toString().contains("READY")) { + log.info("gRPC server is ready!"); + return; + } + } finally { + testChannel.shutdown(); + testChannel.awaitTermination(1, TimeUnit.SECONDS); + } + } catch (Exception e) { + // Server not ready yet + } + TimeUnit.SECONDS.sleep(1); + } + + if (localGrpcProcess != null && localGrpcProcess.isAlive()) { + localGrpcProcess.destroyForcibly(); + } + throw new RuntimeException("Local gRPC server failed to start within timeout"); + } + + private static void startDockerGrpcServer() { + log.info("Starting Docker Compose tika-grpc server"); + + composeContainer = new DockerComposeContainer<>( + new File("src/test/resources/docker-compose.yml")) + .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) + .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) + .withExposedService("tika-grpc", 50052, + Wait.forLogMessage(".*Server started.*\\n", 1)) + .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log)); + + composeContainer.start(); + + log.info("Docker Compose containers started successfully"); + } + + private static void loadGovdocs1() throws IOException, InterruptedException { + int retries = 3; + int attempt = 0; + while (true) { + try { + downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX); + break; + } catch (IOException e) { + attempt++; + if (attempt >= retries) { + throw e; + } + log.warn("Download attempt {} failed, retrying in 10 seconds...", attempt, e); + TimeUnit.SECONDS.sleep(10); + } + } + } + + @AfterAll + void close() { + if (USE_LOCAL_SERVER && localGrpcProcess != null) { + log.info("Stopping local gRPC server"); + localGrpcProcess.destroy(); + try { + if (!localGrpcProcess.waitFor(10, TimeUnit.SECONDS)) { + localGrpcProcess.destroyForcibly(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + localGrpcProcess.destroyForcibly(); + } + } else if (composeContainer != null) { + composeContainer.close(); + } + } + + public static void downloadAndUnzipGovdocs1(int fromIndex, int toIndex) throws IOException { + Path targetDir = TEST_FOLDER.toPath(); + Files.createDirectories(targetDir); + + for (int i = fromIndex; i <= toIndex; i++) { + String zipName = String.format(java.util.Locale.ROOT, "%03d.zip", i); + String url = DIGITAL_CORPORA_ZIP_FILES_URL + "/" + zipName; + Path zipPath = targetDir.resolve(zipName); + + if (Files.exists(zipPath)) { + log.info("{} already exists, skipping download", zipName); + continue; + } + + log.info("Downloading {} from {}...", zipName, url); + try (InputStream in = new URL(url).openStream()) { + Files.copy(in, zipPath, StandardCopyOption.REPLACE_EXISTING); + } + + log.info("Unzipping {}...", zipName); + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zipPath.toFile()))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + Path outPath = targetDir.resolve(entry.getName()); + if (entry.isDirectory()) { + Files.createDirectories(outPath); + } else { + Files.createDirectories(outPath.getParent()); + try (OutputStream out = Files.newOutputStream(outPath)) { + zis.transferTo(out); + } + } + zis.closeEntry(); + } + } + } + + log.info("Finished downloading and extracting govdocs1 files"); + } + + public static void assertAllFilesFetched(Path baseDir, List<FetchAndParseReply> successes, + List<FetchAndParseReply> errors) { + Set<String> allFetchKeys = new HashSet<>(); + for (FetchAndParseReply reply : successes) { + allFetchKeys.add(reply.getFetchKey()); + } + for (FetchAndParseReply reply : errors) { + allFetchKeys.add(reply.getFetchKey()); + } + + Set<String> keysFromGovdocs1 = new HashSet<>(); + try (Stream<Path> paths = Files.walk(baseDir)) { + paths.filter(Files::isRegularFile) + .forEach(file -> { + String relPath = baseDir.relativize(file).toString(); + if (Pattern.compile("\\d{3}\\.zip").matcher(relPath).find()) { + return; + } + keysFromGovdocs1.add(relPath); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Assertions.assertNotEquals(0, successes.size(), "Should have some successful fetches"); + // Note: errors.size() can be 0 if all files parse successfully + log.info("Processed {} files: {} successes, {} errors", allFetchKeys.size(), successes.size(), errors.size()); + Assertions.assertEquals(keysFromGovdocs1, allFetchKeys, () -> { + Set<String> missing = new HashSet<>(keysFromGovdocs1); + missing.removeAll(allFetchKeys); + return "Missing fetch keys: " + missing; + }); + } + + public static ManagedChannel getManagedChannel() { + if (USE_LOCAL_SERVER) { + return ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .executor(Executors.newCachedThreadPool()) + .maxInboundMessageSize(160 * 1024 * 1024) + .build(); Review Comment: `Executors.newCachedThreadPool()` is created per channel build and never shut down. Because gRPC will not manage the lifecycle of a user-supplied executor, this can leak threads across tests/runs. Consider removing the custom executor or using a shared, explicitly closed executor. ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/filesystem/FileSystemFetcherTest.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.filesystem; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.ExternalTestBase; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +@Slf4j +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "exec:exec classpath exceeds Windows CreateProcess command-line length limit") +class FileSystemFetcherTest extends ExternalTestBase { + + @Test + void testFileSystemFetcher() throws Exception { + String fetcherId = "defaultFetcher"; + ManagedChannel channel = getManagedChannel(); + TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(channel); + TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel); + + // Create and save the fetcher dynamically + FileSystemFetcherConfig config = new FileSystemFetcherConfig(); + // Use local path when running in local mode, Docker path otherwise + boolean useLocalServer = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); Review Comment: This local-vs-Docker switch reads `tika.e2e.useLocalServer` with a default of `false`, which conflicts with the module default (`tika.e2e.useLocalServer=true`). Consider defaulting to `true` here as well so IDE runs match Maven runs. ```suggestion boolean useLocalServer = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "true")); ``` ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; + +/** + * Base class for Tika gRPC end-to-end tests. + * Can run with either local server (default in CI) or Docker Compose. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +public abstract class ExternalTestBase { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final int MAX_STARTUP_TIMEOUT = 120; + public static final String GOV_DOCS_FOLDER = "/tika/govdocs1"; + public static final File TEST_FOLDER = new File("target", "govdocs1"); + public static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + public static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + public static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + public static DockerComposeContainer<?> composeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setup() throws Exception { + loadGovdocs1(); + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven exec"); + + Path tikaGrpcDir = findTikaGrpcDirectory(); + Path configFile = Path.of("src/test/resources/tika-config.json").toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + String javaHome = System.getProperty("java.home"); + boolean isWindows = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win"); + String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" : "/bin/java"); + String mvnCmd = isWindows ? "mvn.cmd" : "mvn"; + + ProcessBuilder pb = new ProcessBuilder( + mvnCmd, + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "-classpath %classpath " + + "org.apache.tika.pipes.grpc.TikaGrpcServer " + + "-c " + configFile + " " + + "-p " + GRPC_PORT + ); + + pb.directory(tikaGrpcDir.toFile()); + pb.redirectErrorStream(true); + pb.redirectOutput(ProcessBuilder.Redirect.PIPE); + + localGrpcProcess = pb.start(); + + // Start thread to consume output + Thread logThread = new Thread(() -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(localGrpcProcess.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("tika-grpc: {}", line); + } + } catch (IOException e) { + log.error("Error reading server output", e); + } + }); + logThread.setDaemon(true); + logThread.start(); + + // Wait for server to be ready + waitForServerReady(); + + log.info("Local tika-grpc server started successfully on port {}", GRPC_PORT); + } + + private static Path findTikaGrpcDirectory() { + Path currentDir = Path.of("").toAbsolutePath(); + Path tikaRootDir = currentDir; + + while (tikaRootDir != null && + !(Files.exists(tikaRootDir.resolve("tika-grpc")) && + Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) { + tikaRootDir = tikaRootDir.getParent(); + } + + if (tikaRootDir == null) { + throw new IllegalStateException("Cannot find tika root directory. " + + "Current dir: " + currentDir); + } + + return tikaRootDir.resolve("tika-grpc"); + } + + private static void waitForServerReady() throws Exception { + int maxAttempts = 60; + for (int i = 0; i < maxAttempts; i++) { + try { + ManagedChannel testChannel = ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .build(); + + try { + // Try a simple connection + testChannel.getState(true); + TimeUnit.MILLISECONDS.sleep(100); + if (testChannel.getState(false).toString().contains("READY")) { + log.info("gRPC server is ready!"); + return; + } + } finally { + testChannel.shutdown(); + testChannel.awaitTermination(1, TimeUnit.SECONDS); + } + } catch (Exception e) { + // Server not ready yet + } + TimeUnit.SECONDS.sleep(1); + } + + if (localGrpcProcess != null && localGrpcProcess.isAlive()) { + localGrpcProcess.destroyForcibly(); + } + throw new RuntimeException("Local gRPC server failed to start within timeout"); + } + + private static void startDockerGrpcServer() { + log.info("Starting Docker Compose tika-grpc server"); + + composeContainer = new DockerComposeContainer<>( + new File("src/test/resources/docker-compose.yml")) + .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) + .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) + .withExposedService("tika-grpc", 50052, + Wait.forLogMessage(".*Server started.*\\n", 1)) Review Comment: This test references `src/test/resources/docker-compose.yml`, but that file does not exist in this module (and there are no `docker-compose*.yml` files anywhere under the repo). Docker mode will fail immediately unless the compose file is added or the path updated. ```suggestion String composeFilePath = System.getProperty("tika.docker.compose.file"); if (composeFilePath == null || composeFilePath.isBlank()) { throw new IllegalStateException( "Docker Compose mode requires system property 'tika.docker.compose.file' " + "pointing to a valid docker-compose.yml file."); } File composeFile = new File(composeFilePath); if (!composeFile.isFile()) { throw new IllegalStateException( "Docker Compose file not found at path: " + composeFile.getAbsolutePath()); } composeContainer = new DockerComposeContainer<>(composeFile) .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) .withExposedService("tika-grpc", 50052, Wait.forLogMessage(".*Server started.*\\n", 1)) ``` ########## tika-e2e-tests/tika-grpc/pom.xml: ########## @@ -0,0 +1,173 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.tika</groupId> + <artifactId>tika-e2e-tests</artifactId> + <version>${revision}</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>tika-grpc-e2e-test</artifactId> + <name>Apache Tika gRPC End-to-End Tests</name> + <description>End-to-end tests for Apache Tika gRPC Server using test containers</description> + + <properties> + <!-- Use local server mode by default in CI (faster, no Docker required) --> + <!-- Override with -Dtika.e2e.useLocalServer=false to use Docker --> + <tika.e2e.useLocalServer>true</tika.e2e.useLocalServer> + <corpa.numdocs>2</corpa.numdocs> + </properties> + + <dependencies> + <!-- Tika gRPC --> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-grpc</artifactId> + <version>${tika.version}</version> + </dependency> + + <!-- Tika Fetchers --> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-file-system</artifactId> + <version>${tika.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-core</artifactId> + <version>${tika.version}</version> + </dependency> + + <!-- Jackson for JSON --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <!-- Lombok --> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <optional>true</optional> + </dependency> + + <!-- JUnit 5 --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + + <!-- Testcontainers --> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + + <!-- Logging --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- Awaitility for robust waiting --> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>4.2.0</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <includes> + <include>**/*Test.java</include> + </includes> + <systemPropertyVariables> + <govdocs1.fromIndex>1</govdocs1.fromIndex> + <govdocs1.toIndex>1</govdocs1.toIndex> Review Comment: Surefire hard-codes `govdocs1.fromIndex` and `govdocs1.toIndex` to `1`, which overrides any `-Dgovdocs1.fromIndex=...`/`-Dgovdocs1.toIndex=...` passed on the Maven command line. If you want these to be user-configurable as the README suggests, wire them to Maven properties (with defaults) instead of fixed literals. ```suggestion <govdocs1.fromIndex>${govdocs1.fromIndex}</govdocs1.fromIndex> <govdocs1.toIndex>${govdocs1.toIndex}</govdocs1.toIndex> ``` ########## tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.ignite; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.apache.tika.FetchAndParseReply; +import org.apache.tika.FetchAndParseRequest; +import org.apache.tika.SaveFetcherReply; +import org.apache.tika.SaveFetcherRequest; +import org.apache.tika.TikaGrpc; +import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig; + +/** + * End-to-end test for Ignite ConfigStore. + * Tests that fetchers saved via gRPC are persisted in Ignite + * and available in the forked PipesServer process. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Testcontainers +@Slf4j +@Tag("E2ETest") +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and Docker/Testcontainers not supported on Windows CI") +class IgniteConfigStoreTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int MAX_STARTUP_TIMEOUT = 120; + private static final File TEST_FOLDER = new File("target", "govdocs1"); + private static final int GOV_DOCS_FROM_IDX = Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1")); + private static final int GOV_DOCS_TO_IDX = Integer.parseInt(System.getProperty("govdocs1.toIndex", "1")); + private static final String DIGITAL_CORPORA_ZIP_FILES_URL = "https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles"; + private static final boolean USE_LOCAL_SERVER = Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false")); + private static final int GRPC_PORT = Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052")); + + private static DockerComposeContainer<?> igniteComposeContainer; + private static Process localGrpcProcess; + + @BeforeAll + static void setupIgnite() throws Exception { + // Clean up any orphaned processes from previous runs + if (USE_LOCAL_SERVER) { + log.info("Cleaning up any orphaned processes from previous runs"); + try { + killProcessOnPort(GRPC_PORT); + killProcessOnPort(3344); + killProcessOnPort(10800); + } catch (Exception e) { + log.debug("No orphaned processes to clean up"); + } + } + + // Load govdocs1 if not already loaded + if (!TEST_FOLDER.exists() || TEST_FOLDER.listFiles().length == 0) { + downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX); + } + + if (USE_LOCAL_SERVER) { + startLocalGrpcServer(); + } else { + startDockerGrpcServer(); + } + } + + private static void startLocalGrpcServer() throws Exception { + log.info("Starting local tika-grpc server using Maven"); + + // Find the tika root directory - it should contain both tika-grpc and tika-e2e-tests + Path currentDir = Path.of("").toAbsolutePath(); + Path tikaRootDir = currentDir; + + // Navigate up to find the directory that contains both tika-grpc and tika-e2e-tests + while (tikaRootDir != null && + !(Files.exists(tikaRootDir.resolve("tika-grpc")) && + Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) { + tikaRootDir = tikaRootDir.getParent(); + } + + if (tikaRootDir == null) { + throw new IllegalStateException("Cannot find tika root directory. " + + "Current dir: " + currentDir + ". " + + "Please run from within the tika project."); + } + + Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc"); + if (!Files.exists(tikaGrpcDir)) { + throw new IllegalStateException("Cannot find tika-grpc directory at: " + tikaGrpcDir); + } + + // Use different config for local vs Docker + String configFileName = "tika-config-ignite-local.json"; + Path configFile = Path.of("src/test/resources/" + configFileName).toAbsolutePath(); + + if (!Files.exists(configFile)) { + throw new IllegalStateException("Config file not found: " + configFile); + } + + log.info("Tika root: {}", tikaRootDir); + log.info("Using tika-grpc from: {}", tikaGrpcDir); + log.info("Using config file: {}", configFile); + + // Use mvn exec:exec to run as external process (not exec:java which breaks ServiceLoader) + String javaHome = System.getProperty("java.home"); + String javaCmd = javaHome + "/bin/java"; + + ProcessBuilder pb = new ProcessBuilder( + "mvn", + "exec:exec", + "-Dexec.executable=" + javaCmd, + "-Dexec.args=" + + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " + + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " + + "--add-opens=java.base/java.io=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/java.math=ALL-UNNAMED " + + "--add-opens=java.base/java.util=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " + + "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED " + + "--add-opens=java.base/java.time=ALL-UNNAMED " + + "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED " + + "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED " + + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " + + "--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED " + + "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED " + + "-Dio.netty.tryReflectionSetAccessible=true " + + "-Dignite.work.dir=" + tikaGrpcDir.resolve("target/ignite-work") + " " + + "-classpath %classpath " + + "org.apache.tika.pipes.grpc.TikaGrpcServer " + + "-c " + configFile + " " + + "-p " + GRPC_PORT + ); + + pb.directory(tikaGrpcDir.toFile()); + pb.redirectErrorStream(true); + pb.redirectOutput(ProcessBuilder.Redirect.PIPE); + + localGrpcProcess = pb.start(); + + // Track whether Ignite has started + final boolean[] igniteStarted = {false}; + + // Start a thread to consume and log output, watching for Ignite startup + Thread logThread = new Thread(() -> { + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(localGrpcProcess.getInputStream(), java.nio.charset.StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("tika-grpc: {}", line); + + // Look for signs that Ignite has fully started + if (line.contains("Ignite server started") || + line.contains("Table") && line.contains("created successfully") || + line.contains("Server started, listening on")) { + synchronized (igniteStarted) { + igniteStarted[0] = true; + igniteStarted.notifyAll(); + } + } + } + } catch (IOException e) { + log.error("Error reading server output", e); + } + }); + logThread.setDaemon(true); + logThread.start(); + + // Wait for Ignite to start - check both log messages and gRPC connectivity + log.info("Waiting for local gRPC server and Ignite to start (timeout: 180 seconds)..."); + + try { + org.awaitility.Awaitility.await() + .atMost(java.time.Duration.ofSeconds(180)) + .pollInterval(java.time.Duration.ofSeconds(2)) + .until(() -> { + boolean igniteReady; + synchronized (igniteStarted) { + igniteReady = igniteStarted[0]; + } + + if (!igniteReady) { + log.debug("Waiting for Ignite to start..."); + return false; + } + + // Try to actually test gRPC readiness with a real (lightweight) call + try { + ManagedChannel testChannel = ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .build(); + + try { + // Try to use the health check service + io.grpc.health.v1.HealthGrpc.HealthBlockingStub healthStub = + io.grpc.health.v1.HealthGrpc.newBlockingStub(testChannel) + .withDeadlineAfter(2, TimeUnit.SECONDS); + + io.grpc.health.v1.HealthCheckResponse response = healthStub.check( + io.grpc.health.v1.HealthCheckRequest.getDefaultInstance()); + + boolean serving = response.getStatus() == + io.grpc.health.v1.HealthCheckResponse.ServingStatus.SERVING; + + if (serving) { + log.info("gRPC server is healthy and serving!"); + return true; + } else { + log.debug("gRPC server responding but not serving yet: {}", response.getStatus()); + return false; + } + } finally { + testChannel.shutdown(); + testChannel.awaitTermination(1, TimeUnit.SECONDS); + } + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.UNIMPLEMENTED) { + // Health check not implemented, just verify channel works + log.info("Health check not available, assuming server is ready"); + return true; + } + log.debug("gRPC server not ready yet: {}", e.getMessage()); + return false; + } catch (Exception e) { + log.debug("gRPC server not ready yet: {}", e.getMessage()); + return false; + } + }); + + log.info("Both gRPC server and Ignite are ready!"); + } catch (org.awaitility.core.ConditionTimeoutException e) { + if (localGrpcProcess.isAlive()) { + localGrpcProcess.destroyForcibly(); + } + throw new RuntimeException("Local gRPC server or Ignite failed to start within timeout", e); + } + + log.info("Local tika-grpc server started successfully on port {}", GRPC_PORT); + } + + + private static void startDockerGrpcServer() { + log.info("Starting Docker Compose tika-grpc server"); + + igniteComposeContainer = new DockerComposeContainer<>( + new File("src/test/resources/docker-compose-ignite.yml")) + .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath()) + .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT, ChronoUnit.SECONDS)) + .withExposedService("tika-grpc", 50052, + Wait.forLogMessage(".*Server started.*\\n", 1)) + .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log)); + + igniteComposeContainer.start(); + + log.info("Ignite Docker Compose containers started successfully"); + } + + @AfterAll + static void teardownIgnite() { + if (USE_LOCAL_SERVER && localGrpcProcess != null) { + log.info("Stopping local gRPC server and all child processes"); + + try { + // Get the PID of the Maven process + long mvnPid = localGrpcProcess.pid(); + log.info("Maven process PID: {}", mvnPid); + + // Try graceful shutdown first + localGrpcProcess.destroy(); + + if (!localGrpcProcess.waitFor(10, TimeUnit.SECONDS)) { + log.warn("Process didn't stop gracefully, forcing shutdown"); + localGrpcProcess.destroyForcibly(); + localGrpcProcess.waitFor(5, TimeUnit.SECONDS); + } + + // Give it a moment for cleanup + Thread.sleep(2000); + + // Kill any remaining child processes by finding processes listening on Ignite/gRPC ports + // Only do this if the process is still running + try { + killProcessOnPort(GRPC_PORT); // Kill gRPC server + killProcessOnPort(3344); // Kill Ignite's internal port + killProcessOnPort(10800); // Kill Ignite client connector + } catch (Exception e) { + log.debug("Error killing processes on ports (may already be stopped): {}", e.getMessage()); + } + + log.info("Local gRPC server stopped"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + localGrpcProcess.destroyForcibly(); + } + } else if (igniteComposeContainer != null) { + igniteComposeContainer.close(); + } + } + + private static void killProcessOnPort(int port) throws IOException, InterruptedException { + // Find process listening on the port using lsof + ProcessBuilder findPb = new ProcessBuilder("lsof", "-ti", ":" + port); + findPb.redirectErrorStream(true); + Process findProcess = findPb.start(); + + try (java.io.BufferedReader reader = new java.io.BufferedReader( + new java.io.InputStreamReader(findProcess.getInputStream(), java.nio.charset.StandardCharsets.UTF_8))) { + String pidStr = reader.readLine(); + if (pidStr != null && !pidStr.trim().isEmpty()) { + long pid = Long.parseLong(pidStr.trim()); + long myPid = ProcessHandle.current().pid(); + + // Don't kill ourselves or our parent + if (pid == myPid || isParentProcess(pid)) { + log.debug("Skipping kill of PID {} on port {} (test process or parent)", pid, port); + return; + } + + log.info("Found process {} listening on port {}, killing it", pid, port); + + ProcessBuilder killPb = new ProcessBuilder("kill", String.valueOf(pid)); + Process killProcess = killPb.start(); + killProcess.waitFor(2, TimeUnit.SECONDS); + + // If still alive, force kill + Thread.sleep(1000); + ProcessBuilder forceKillPb = new ProcessBuilder("kill", "-9", String.valueOf(pid)); + Process forceKillProcess = forceKillPb.start(); + forceKillProcess.waitFor(2, TimeUnit.SECONDS); + } + } + + findProcess.waitFor(2, TimeUnit.SECONDS); + } + + private static boolean isParentProcess(long pid) { + try { + ProcessHandle current = ProcessHandle.current(); + while (current.parent().isPresent()) { + current = current.parent().get(); + if (current.pid() == pid) { + return true; + } + } + } catch (Exception e) { + log.debug("Error checking parent process", e); + } + return false; + } + + @Test + void testIgniteConfigStore() throws Exception { + String fetcherId = "dynamicIgniteFetcher"; + ManagedChannel channel = getManagedChannelForIgnite(); + + try { + TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(channel); + TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel); + + // Create and save the fetcher dynamically + FileSystemFetcherConfig config = new FileSystemFetcherConfig(); + // Use local path when running in local mode, Docker path otherwise + String basePath = USE_LOCAL_SERVER ? TEST_FOLDER.getAbsolutePath() : "/tika/govdocs1"; + config.setBasePath(basePath); + + String configJson = OBJECT_MAPPER.writeValueAsString(config); + log.info("Creating fetcher with Ignite ConfigStore (basePath={}): {}", basePath, configJson); + + SaveFetcherReply saveReply = blockingStub.saveFetcher(SaveFetcherRequest + .newBuilder() + .setFetcherId(fetcherId) + .setFetcherClass("org.apache.tika.pipes.fetcher.fs.FileSystemFetcher") + .setFetcherConfigJson(configJson) + .build()); + + log.info("Fetcher saved to Ignite: {}", saveReply.getFetcherId()); + + List<FetchAndParseReply> successes = Collections.synchronizedList(new ArrayList<>()); + List<FetchAndParseReply> errors = Collections.synchronizedList(new ArrayList<>()); + + CountDownLatch countDownLatch = new CountDownLatch(1); + StreamObserver<FetchAndParseRequest> + requestStreamObserver = tikaStub.fetchAndParseBiDirectionalStreaming(new StreamObserver<>() { + @Override + public void onNext(FetchAndParseReply fetchAndParseReply) { + log.debug("Reply from fetch-and-parse - key={}, status={}", + fetchAndParseReply.getFetchKey(), fetchAndParseReply.getStatus()); + if ("FETCH_AND_PARSE_EXCEPTION".equals(fetchAndParseReply.getStatus())) { + errors.add(fetchAndParseReply); + } else { + successes.add(fetchAndParseReply); + } + } + + @Override + public void onError(Throwable throwable) { + log.error("Received an error", throwable); + Assertions.fail(throwable); + countDownLatch.countDown(); + } + + @Override + public void onCompleted() { + log.info("Finished streaming fetch and parse replies"); + countDownLatch.countDown(); + } + }); + + // Submit files for parsing - limit to configured number + int maxDocs = Integer.parseInt(System.getProperty("corpa.numdocs", "-1")); + log.info("Document limit: {}", maxDocs == -1 ? "unlimited" : maxDocs); + + try (Stream<Path> paths = Files.walk(TEST_FOLDER.toPath())) { + Stream<Path> fileStream = paths.filter(Files::isRegularFile); + + if (maxDocs > 0) { + fileStream = fileStream.limit(maxDocs); + } + + fileStream.forEach(file -> { + try { + String relPath = TEST_FOLDER.toPath().relativize(file).toString(); + requestStreamObserver.onNext(FetchAndParseRequest + .newBuilder() + .setFetcherId(fetcherId) + .setFetchKey(relPath) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + log.info("Done submitting files to Ignite-backed fetcher {}", fetcherId); + + requestStreamObserver.onCompleted(); + + // Wait for all parsing to complete + try { + if (!countDownLatch.await(3, TimeUnit.MINUTES)) { + log.error("Timed out waiting for parse to complete"); + Assertions.fail("Timed out waiting for parsing to complete"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Assertions.fail("Interrupted while waiting for parsing to complete"); + } + + // Verify documents were processed + if (maxDocs == -1) { + assertAllFilesFetched(TEST_FOLDER.toPath(), successes, errors); + } else { + int totalProcessed = successes.size() + errors.size(); + log.info("Processed {} documents with Ignite ConfigStore (limit was {})", + totalProcessed, maxDocs); + Assertions.assertTrue(totalProcessed <= maxDocs, + "Should not process more than " + maxDocs + " documents"); + Assertions.assertTrue(totalProcessed > 0, + "Should have processed at least one document"); + } + + log.info("Ignite ConfigStore test completed successfully - {} successes, {} errors", + successes.size(), errors.size()); + } finally { + // Properly shutdown gRPC channel to avoid resource leak + channel.shutdown(); + try { + if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + // Helper method for downloading test data + private static void downloadAndUnzipGovdocs1(int fromIndex, int toIndex) throws IOException { + Path targetDir = TEST_FOLDER.toPath(); + Files.createDirectories(targetDir); + + for (int i = fromIndex; i <= toIndex; i++) { + String zipName = String.format(java.util.Locale.ROOT, "%03d.zip", i); + String url = DIGITAL_CORPORA_ZIP_FILES_URL + "/" + zipName; + Path zipPath = targetDir.resolve(zipName); + + if (Files.exists(zipPath)) { + log.info("{} already exists, skipping download", zipName); + continue; + } + + log.info("Downloading {} from {}...", zipName, url); + try (InputStream in = new URL(url).openStream()) { + Files.copy(in, zipPath, StandardCopyOption.REPLACE_EXISTING); + } + + log.info("Unzipping {}...", zipName); + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zipPath.toFile()))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + Path outPath = targetDir.resolve(entry.getName()); + if (entry.isDirectory()) { + Files.createDirectories(outPath); + } else { + Files.createDirectories(outPath.getParent()); + try (OutputStream out = Files.newOutputStream(outPath)) { + zis.transferTo(out); + } + } + zis.closeEntry(); + } + } + } + + log.info("Finished downloading and extracting govdocs1 files"); + } + + // Helper method to validate all files were fetched + private static void assertAllFilesFetched(Path baseDir, List<FetchAndParseReply> successes, + List<FetchAndParseReply> errors) { + java.util.Set<String> allFetchKeys = new java.util.HashSet<>(); + for (FetchAndParseReply reply : successes) { + allFetchKeys.add(reply.getFetchKey()); + } + for (FetchAndParseReply reply : errors) { + allFetchKeys.add(reply.getFetchKey()); + } + + java.util.Set<String> keysFromGovdocs1 = new java.util.HashSet<>(); + try (Stream<Path> paths = Files.walk(baseDir)) { + paths.filter(Files::isRegularFile) + .forEach(file -> { + String relPath = baseDir.relativize(file).toString(); + if (java.util.regex.Pattern.compile("\\d{3}\\.zip").matcher(relPath).find()) { + return; + } + keysFromGovdocs1.add(relPath); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Assertions.assertNotEquals(0, successes.size(), "Should have some successful fetches"); + log.info("Processed {} files: {} successes, {} errors", allFetchKeys.size(), successes.size(), errors.size()); + Assertions.assertEquals(keysFromGovdocs1, allFetchKeys, () -> { + java.util.Set<String> missing = new java.util.HashSet<>(keysFromGovdocs1); + missing.removeAll(allFetchKeys); + return "Missing fetch keys: " + missing; + }); + } + + // Helper method to create gRPC channel + private static ManagedChannel getManagedChannelForIgnite() { + if (USE_LOCAL_SERVER) { + return ManagedChannelBuilder + .forAddress("localhost", GRPC_PORT) + .usePlaintext() + .executor(Executors.newCachedThreadPool()) + .maxInboundMessageSize(160 * 1024 * 1024) + .build(); + } else { Review Comment: `Executors.newCachedThreadPool()` is created for the channel and never shut down. Because gRPC won’t close a user-supplied executor, this can leak threads across the test suite. Prefer the default executor or a shared executor that is explicitly shut down in `@AfterAll`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
