This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 60e1aa217b TIKA-4670 -- improve exit handling btwn pipesclient and 
pipesserver (#2618)
60e1aa217b is described below

commit 60e1aa217b10b9972d32987194d76328cb59fe7d
Author: Tim Allison <[email protected]>
AuthorDate: Wed Feb 18 16:36:44 2026 -0500

    TIKA-4670 -- improve exit handling btwn pipesclient and pipesserver (#2618)
---
 .../tika/pipes/core/PerClientServerManager.java    |  20 +-
 .../org/apache/tika/pipes/core/PipesClient.java    | 153 +++------
 .../tika/pipes/core/protocol/PipesMessage.java     | 169 ++++++++++
 .../tika/pipes/core/protocol/PipesMessageType.java |  96 ++++++
 .../core/protocol/ProtocolDesyncException.java     |  30 ++
 .../core/protocol/ShutDownReceivedException.java   |  34 ++
 .../tika/pipes/core/server/ConnectionHandler.java  | 255 +++++----------
 .../apache/tika/pipes/core/server/PipesServer.java | 364 ++++++---------------
 .../tika/pipes/core/server/ServerProtocolIO.java   | 133 ++++++++
 .../tika/pipes/core/protocol/PipesMessageTest.java | 202 ++++++++++++
 .../apache/tika/pipes/core/PipesClientTest.java    |  61 +++-
 11 files changed, 970 insertions(+), 547 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
index 94c972f013..a8cf40ba10 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PerClientServerManager.java
@@ -34,7 +34,6 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.tika.pipes.core.server.PipesServer;
 import org.apache.tika.utils.ProcessUtils;
 
 /**
@@ -161,17 +160,14 @@ public class PerClientServerManager implements 
ServerManager {
                     int exitValue = process.exitValue();
                     LOG.error("clientId={}: Process exited with code {} before 
connecting to socket",
                             clientId, exitValue);
-                    // Don't treat known crash exit codes as initialization 
failures
-                    // These indicate the server started but crashed during 
processing
-                    if (exitValue == PipesServer.OOM_EXIT_CODE ||
-                            exitValue == PipesServer.TIMEOUT_EXIT_CODE ||
-                            exitValue == 
PipesServer.UNSPECIFIED_CRASH_EXIT_CODE) {
-                        // Mark for restart and throw IOException so caller 
can retry
-                        pendingRestart = true;
-                        throw new IOException("Server crashed (exit code " + 
exitValue + ") - will retry");
-                    }
-                    throw new ServerInitializationException(
-                            "Process failed to start (exit code " + exitValue 
+ "). Check JVM arguments and classpath.");
+                    // Always treat pre-connect death as retryable.
+                    // The only non-retryable paths are:
+                    // 1. pb.start() fails (can't launch process) - handled in 
startServer()
+                    // 2. Server explicitly reports bad config via protocol - 
handled in waitForStartup()
+                    // 3. Exhausted all retry attempts - handled in maybeInit()
+                    pendingRestart = true;
+                    throw new IOException(
+                            "Server process died before connecting (exit code 
" + exitValue + ") - will retry");
                 }
                 // Check if we've exceeded the overall timeout
                 long elapsed = System.currentTimeMillis() - startTime;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 82c1d7bfea..fb4ae62824 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -19,9 +19,6 @@ package org.apache.tika.pipes.core;
 
 import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.TIMEOUT;
 import static 
org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH;
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -35,9 +32,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.HexFormat;
 import java.util.List;
-import java.util.Locale;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,9 +48,10 @@ import org.apache.tika.pipes.api.FetchEmitTuple;
 import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitDataImpl;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
 import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.pipes.core.server.IntermediateResult;
-import org.apache.tika.pipes.core.server.PipesServer;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.StringUtils;
 
@@ -71,14 +67,6 @@ import org.apache.tika.utils.StringUtils;
  */
 public class PipesClient implements Closeable {
 
-    public enum COMMANDS {
-        PING, ACK, NEW_REQUEST, SHUT_DOWN;
-
-        public byte getByte() {
-            return (byte) (ordinal() + 1);
-        }
-    }
-
     private static final Logger LOG = 
LoggerFactory.getLogger(PipesClient.class);
     private static final AtomicInteger CLIENT_COUNTER = new AtomicInteger(0);
     public static final int SOCKET_CONNECT_TIMEOUT_MS = 60000;
@@ -86,6 +74,7 @@ public class PipesClient implements Closeable {
 
     private final PipesConfig pipesConfig;
     private final ServerManager serverManager;
+    private final boolean ownsServerManager;
     private final int pipesClientId;
 
     private ConnectionTuple connectionTuple;
@@ -93,6 +82,10 @@ public class PipesClient implements Closeable {
 
     /**
      * Creates a PipesClient with the given server manager.
+     * <p>
+     * The caller retains ownership of the server manager and is responsible
+     * for closing it. This is used in shared mode where multiple clients
+     * share a single server manager.
      *
      * @param pipesConfig the pipes configuration
      * @param serverManager the server manager (per-client or shared)
@@ -100,6 +93,7 @@ public class PipesClient implements Closeable {
     public PipesClient(PipesConfig pipesConfig, ServerManager serverManager) {
         this.pipesConfig = pipesConfig;
         this.serverManager = serverManager;
+        this.ownsServerManager = false;
         this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
     }
 
@@ -117,6 +111,7 @@ public class PipesClient implements Closeable {
         this.pipesConfig = pipesConfig;
         this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
         this.serverManager = new PerClientServerManager(pipesConfig, 
tikaConfigPath, pipesClientId);
+        this.ownsServerManager = true;
     }
 
     public int getFilesProcessed() {
@@ -132,10 +127,9 @@ public class PipesClient implements Closeable {
             return false;
         }
         try {
-            connectionTuple.output.write(COMMANDS.PING.getByte());
-            connectionTuple.output.flush();
-            int ping = connectionTuple.input.read();
-            if (ping == COMMANDS.PING.getByte()) {
+            PipesMessage.ping().write(connectionTuple.output);
+            PipesMessage response = PipesMessage.read(connectionTuple.input);
+            if (response.type() == PipesMessageType.PING) {
                 return true;
             }
         } catch (IOException e) {
@@ -151,6 +145,9 @@ public class PipesClient implements Closeable {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
+        if (ownsServerManager) {
+            serverManager.close();
+        }
     }
 
     public int getPipesClientId() {
@@ -167,8 +164,7 @@ public class PipesClient implements Closeable {
         }
         LOG.debug("pipesClientId={}: closing connection", pipesClientId);
         try {
-            connectionTuple.output.write(COMMANDS.SHUT_DOWN.getByte());
-            connectionTuple.output.flush();
+            PipesMessage.shutDown().write(connectionTuple.output);
         } catch (IOException e) {
             // swallow
         }
@@ -307,17 +303,13 @@ public class PipesClient implements Closeable {
     private void writeTask(FetchEmitTuple t) throws IOException {
         LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", 
pipesClientId, t.getId());
         byte[] bytes = JsonPipesIpc.toBytes(t);
-        connectionTuple.output.write(COMMANDS.NEW_REQUEST.getByte());
-        connectionTuple.output.writeInt(bytes.length);
-        connectionTuple.output.write(bytes);
-        connectionTuple.output.flush();
+        PipesMessage.newRequest(bytes).write(connectionTuple.output);
     }
 
     private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult 
intermediateResult) throws InterruptedException {
         long timeoutMillis = getTimeoutMillis(pipesConfig, 
t.getParseContext());
         Instant start = Instant.now();
         Instant lastUpdate = start;
-        long lastProgressCounter = 0;
 
         while (true) {
             if (Thread.currentThread().isInterrupted()) {
@@ -334,42 +326,49 @@ public class PipesClient implements Closeable {
                         intermediateResult.get());
             }
             try {
-                // Read blocks on the socket
-                PipesServer.PROCESSING_STATUS status = readServerStatus();
-                LOG.trace("clientId={}: switch status id={} status={}", 
pipesClientId, t.getId(), status);
-                String msg = null;
-                switch (status) {
+                PipesMessage msg = PipesMessage.read(connectionTuple.input);
+                LOG.trace("clientId={}: received message type={} id={}", 
pipesClientId, msg.type(), t.getId());
+
+                // Send ACK only for messages that require it
+                if (msg.type().requiresAck()) {
+                    PipesMessage.ack().write(connectionTuple.output);
+                }
+
+                switch (msg.type()) {
                     case OOM:
-                        msg = readResult(String.class);
-                        serverManager.markServerForRestart(); // Signal that 
server is dying
+                        String oomMsg = JsonPipesIpc.fromBytes(msg.payload(), 
String.class);
+                        serverManager.markServerForRestart();
                         closeConnection();
                         return buildFatalResult(t.getId(), t.getEmitKey(), 
PipesResult.RESULT_STATUS.OOM,
-                                intermediateResult.get(), msg);
+                                intermediateResult.get(), oomMsg);
                     case TIMEOUT:
-                        msg = readResult(String.class);
-                        serverManager.markServerForRestart(); // Signal that 
server is dying
+                        String timeoutMsg = 
JsonPipesIpc.fromBytes(msg.payload(), String.class);
+                        serverManager.markServerForRestart();
                         closeConnection();
-                        return buildFatalResult(t.getId(), t.getEmitKey(), 
TIMEOUT, intermediateResult.get(), msg);
+                        return buildFatalResult(t.getId(), t.getEmitKey(), 
TIMEOUT,
+                                intermediateResult.get(), timeoutMsg);
                     case UNSPECIFIED_CRASH:
-                        msg = readResult(String.class);
+                        String crashMsg = 
JsonPipesIpc.fromBytes(msg.payload(), String.class);
+                        serverManager.markServerForRestart();
                         closeConnection();
                         return buildFatalResult(t.getId(), t.getEmitKey(), 
UNSPECIFIED_CRASH,
-                                intermediateResult.get(), msg);
+                                intermediateResult.get(), crashMsg);
                     case INTERMEDIATE_RESULT:
-                        intermediateResult.set(readResult(Metadata.class));
+                        
intermediateResult.set(JsonPipesIpc.fromBytes(msg.payload(), Metadata.class));
                         lastUpdate = Instant.now();
                         break;
                     case WORKING:
-                        lastProgressCounter = readProgressCounter();
                         lastUpdate = Instant.now();
                         break;
                     case FINISHED:
-                        PipesResult result = readResult(PipesResult.class);
+                        PipesResult result = 
JsonPipesIpc.fromBytes(msg.payload(), PipesResult.class);
                         // Restore ParseContext from original FetchEmitTuple 
(not serialized back from server)
                         if (result.emitData() instanceof EmitDataImpl 
emitDataImpl) {
                             emitDataImpl.setParseContext(t.getParseContext());
                         }
                         return result;
+                    default:
+                        throw new IOException("Unexpected message type from 
server: " + msg.type());
                 }
             } catch (SocketTimeoutException e) {
                 LOG.warn("clientId={}: Socket timeout exception while waiting 
for server", pipesClientId, e);
@@ -385,9 +384,9 @@ public class PipesClient implements Closeable {
                 // Handle crash and determine status based on exit code
                 int exitCode = serverManager.handleCrashAndGetExitCode();
                 PipesResult.RESULT_STATUS status = UNSPECIFIED_CRASH;
-                if (exitCode == PipesServer.OOM_EXIT_CODE) {
+                if (exitCode == PipesMessageType.OOM.getExitCode().orElse(-1)) 
{
                     status = PipesResult.RESULT_STATUS.OOM;
-                } else if (exitCode == PipesServer.TIMEOUT_EXIT_CODE) {
+                } else if (exitCode == 
PipesMessageType.TIMEOUT.getExitCode().orElse(-1)) {
                     status = PipesResult.RESULT_STATUS.TIMEOUT;
                 }
                 closeConnection();
@@ -397,10 +396,6 @@ public class PipesClient implements Closeable {
         }
     }
 
-    private long readProgressCounter() throws IOException {
-        return connectionTuple.input.readLong();
-    }
-
     private PipesResult buildFatalResult(String id, EmitKey emitKey, 
PipesResult.RESULT_STATUS status,
                                          Optional<Metadata> 
intermediateResultOpt) {
         return buildFatalResult(id, emitKey, status, intermediateResultOpt, 
null);
@@ -422,63 +417,19 @@ public class PipesClient implements Closeable {
         }
     }
 
-    private PipesServer.PROCESSING_STATUS readServerStatus() throws 
IOException {
-        int statusByte = connectionTuple.input.read();
-        writeAck();
-        PipesServer.PROCESSING_STATUS status = null;
-        try {
-            status = PipesServer.PROCESSING_STATUS.lookup(statusByte);
-        } catch (IllegalArgumentException e) {
-            String byteString = "-1";
-            if (statusByte > -1) {
-                byteString = String.format(Locale.US, "%02x", (byte) 
statusByte);
-            }
-            throw new IOException("problem reading response from server: " + 
byteString, e);
-        }
-        return status;
-    }
-
-    private <T> T readResult(Class<T> clazz) throws IOException {
-        int len = connectionTuple.input.readInt();
-        if (len < 0 || len > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
-            throw new IOException("Server response length " + len +
-                    " exceeds maximum allowed size of " + 
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
-        }
-        byte[] bytes = new byte[len];
-        connectionTuple.input.readFully(bytes);
-
-        writeAck();
-        return JsonPipesIpc.fromBytes(bytes, clazz);
-    }
-
-    private void writeAck() throws IOException {
-        connectionTuple.output.write(ACK.getByte());
-        connectionTuple.output.flush();
-    }
-
     private void waitForStartup() throws IOException {
-        // Wait for ready byte
-        int b = connectionTuple.input.read();
-        writeAck();
-        if (b == READY.getByte()) {
+        PipesMessage msg = PipesMessage.read(connectionTuple.input);
+        if (msg.type() == PipesMessageType.READY) {
             LOG.debug("clientId={}: server ready", pipesClientId);
-        } else if (b == FINISHED.getByte()) {
-            int len = connectionTuple.input.readInt();
-            if (len < 0 || len > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
-                throw new IOException("Server startup error message length " + 
len +
-                        " exceeds maximum allowed size of " + 
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
-            }
-            byte[] bytes = new byte[len];
-            connectionTuple.input.readFully(bytes);
-            writeAck();
-            String msg = new String(bytes, StandardCharsets.UTF_8);
-            LOG.error("clientId={}: Server failed to start: {}", 
pipesClientId, msg);
-            throw new ServerInitializationException(msg);
+        } else if (msg.type() == PipesMessageType.STARTUP_FAILED) {
+            // Send ACK for startup failure
+            PipesMessage.ack().write(connectionTuple.output);
+            String errorMsg = new String(msg.payload(), 
StandardCharsets.UTF_8);
+            LOG.error("clientId={}: Server failed to start: {}", 
pipesClientId, errorMsg);
+            throw new ServerInitializationException(errorMsg);
         } else {
-            LOG.error("clientId={}: Unexpected first byte: {}", pipesClientId,
-                    HexFormat.of().formatHex(new byte[]{(byte) b}));
-            throw new IOException("Unexpected first byte from server: " +
-                    HexFormat.of().formatHex(new byte[]{(byte) b}));
+            LOG.error("clientId={}: Unexpected first message type: {}", 
pipesClientId, msg.type());
+            throw new IOException("Unexpected first message type from server: 
" + msg.type());
         }
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
new file mode 100644
index 0000000000..753f780fda
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessage.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Locale;
+
+/**
+ * Uniform framed message for the PipesClient/PipesServer IPC protocol.
+ * <p>
+ * Wire format: {@code [MAGIC 0x54 0x4B][TYPE 1B][LEN 4B][PAYLOAD]}
+ * <ul>
+ *   <li>MAGIC — two bytes {@code 0x54 0x4B} ("TK") for desync detection</li>
+ *   <li>TYPE — one byte identifying the {@link PipesMessageType}</li>
+ *   <li>LEN — four-byte big-endian payload length (0 for empty payloads)</li>
+ *   <li>PAYLOAD — {@code LEN} bytes of payload data</li>
+ * </ul>
+ */
+public record PipesMessage(PipesMessageType type, byte[] payload) {
+
+    static final byte MAGIC_0 = 0x54; // 'T'
+    static final byte MAGIC_1 = 0x4B; // 'K'
+
+    /** Maximum payload size: 100 MB (same as old MAX_FETCH_EMIT_TUPLE_BYTES). 
*/
+    public static final int MAX_PAYLOAD_BYTES = 100 * 1024 * 1024;
+
+    private static final byte[] EMPTY = new byte[0];
+
+    /**
+     * Reads one framed message from the stream.
+     *
+     * @throws ProtocolDesyncException if magic bytes don't match
+     * @throws EOFException if the stream ends before a complete message
+     * @throws IOException on payload size violations or I/O errors
+     */
+    public static PipesMessage read(DataInputStream in) throws IOException {
+        int m0 = in.read();
+        if (m0 == -1) {
+            throw new EOFException("Stream closed before magic byte");
+        }
+        int m1 = in.read();
+        if (m1 == -1) {
+            throw new EOFException("Stream closed after first magic byte");
+        }
+        if ((byte) m0 != MAGIC_0 || (byte) m1 != MAGIC_1) {
+            throw new ProtocolDesyncException(
+                    String.format(Locale.ROOT, "Expected magic 0x%02x%02x but 
got 0x%02x%02x",
+                            MAGIC_0 & 0xFF, MAGIC_1 & 0xFF, m0 & 0xFF, m1 & 
0xFF));
+        }
+
+        int typeByte = in.read();
+        if (typeByte == -1) {
+            throw new EOFException("Stream closed before type byte");
+        }
+        PipesMessageType type = PipesMessageType.lookup(typeByte);
+
+        int len = in.readInt();
+        if (len < 0) {
+            throw new IOException("Negative payload length: " + len);
+        }
+        if (len > MAX_PAYLOAD_BYTES) {
+            throw new IOException("Payload length " + len +
+                    " exceeds maximum of " + MAX_PAYLOAD_BYTES + " bytes");
+        }
+
+        byte[] payload;
+        if (len == 0) {
+            payload = EMPTY;
+        } else {
+            payload = new byte[len];
+            in.readFully(payload);
+        }
+        return new PipesMessage(type, payload);
+    }
+
+    /**
+     * Writes this message to the stream and flushes.
+     */
+    public void write(DataOutputStream out) throws IOException {
+        out.write(MAGIC_0);
+        out.write(MAGIC_1);
+        out.write(type.getByte());
+        out.writeInt(payload.length);
+        if (payload.length > 0) {
+            out.write(payload);
+        }
+        out.flush();
+    }
+
+    // ---- convenience factories ----
+
+    public static PipesMessage ping() {
+        return new PipesMessage(PipesMessageType.PING, EMPTY);
+    }
+
+    public static PipesMessage ack() {
+        return new PipesMessage(PipesMessageType.ACK, EMPTY);
+    }
+
+    public static PipesMessage ready() {
+        return new PipesMessage(PipesMessageType.READY, EMPTY);
+    }
+
+    public static PipesMessage shutDown() {
+        return new PipesMessage(PipesMessageType.SHUT_DOWN, EMPTY);
+    }
+
+    /**
+     * Creates a WORKING heartbeat with a progress counter in the payload.
+     */
+    public static PipesMessage working(long counter) {
+        byte[] payload = ByteBuffer.allocate(Long.BYTES)
+                .order(ByteOrder.BIG_ENDIAN)
+                .putLong(counter)
+                .array();
+        return new PipesMessage(PipesMessageType.WORKING, payload);
+    }
+
+    public static PipesMessage newRequest(byte[] payload) {
+        return new PipesMessage(PipesMessageType.NEW_REQUEST, payload);
+    }
+
+    public static PipesMessage finished(byte[] payload) {
+        return new PipesMessage(PipesMessageType.FINISHED, payload);
+    }
+
+    public static PipesMessage intermediateResult(byte[] payload) {
+        return new PipesMessage(PipesMessageType.INTERMEDIATE_RESULT, payload);
+    }
+
+    public static PipesMessage startupFailed(byte[] payload) {
+        return new PipesMessage(PipesMessageType.STARTUP_FAILED, payload);
+    }
+
+    public static PipesMessage crash(PipesMessageType crashType, byte[] 
payload) {
+        return new PipesMessage(crashType, payload);
+    }
+
+    /**
+     * Extracts the progress counter from a WORKING message payload.
+     */
+    public long progressCounter() {
+        if (type != PipesMessageType.WORKING) {
+            throw new IllegalStateException("progressCounter() only valid for 
WORKING messages");
+        }
+        return ByteBuffer.wrap(payload)
+                .order(ByteOrder.BIG_ENDIAN)
+                .getLong();
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
new file mode 100644
index 0000000000..a0d2e6eb7f
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/PipesMessageType.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.util.Locale;
+import java.util.OptionalInt;
+
+/**
+ * Unified message types for the PipesClient/PipesServer IPC protocol.
+ * <p>
+ * Replaces the separate {@code PipesClient.COMMANDS} and
+ * {@code PipesServer.PROCESSING_STATUS} enums with a single enum
+ * that carries explicit wire bytes, ACK requirements, and exit codes.
+ */
+public enum PipesMessageType {
+
+    PING(0x01, false, -1),
+    ACK(0x02, false, -1),
+    NEW_REQUEST(0x03, false, -1),
+    SHUT_DOWN(0x04, false, -1),
+    READY(0x05, false, -1),
+    STARTUP_FAILED(0x06, true, -1),
+    INTERMEDIATE_RESULT(0x07, true, -1),
+    WORKING(0x08, false, -1),
+    FINISHED(0x09, true, -1),
+    OOM(0x0A, true, 18),
+    TIMEOUT(0x0B, true, 17),
+    UNSPECIFIED_CRASH(0x0C, true, 19);
+
+    private final int wireByte;
+    private final boolean requiresAck;
+    private final int exitCode;
+
+    PipesMessageType(int wireByte, boolean requiresAck, int exitCode) {
+        this.wireByte = wireByte;
+        this.requiresAck = requiresAck;
+        this.exitCode = exitCode;
+    }
+
+    /**
+     * Returns the single byte used on the wire for this message type.
+     */
+    public byte getByte() {
+        return (byte) wireByte;
+    }
+
+    /**
+     * Returns {@code true} if the receiver must send an ACK after reading
+     * a message of this type.
+     */
+    public boolean requiresAck() {
+        return requiresAck;
+    }
+
+    /**
+     * Returns the exit code the server should use when exiting due to this
+     * condition, or empty if this message type does not trigger an exit.
+     */
+    public OptionalInt getExitCode() {
+        if (exitCode < 0) {
+            return OptionalInt.empty();
+        }
+        return OptionalInt.of(exitCode);
+    }
+
+    /**
+     * Looks up a message type by its wire byte.
+     *
+     * @param b the wire byte
+     * @return the matching message type
+     * @throws IllegalArgumentException if no type matches
+     */
+    public static PipesMessageType lookup(int b) {
+        for (PipesMessageType type : values()) {
+            if (type.wireByte == b) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException(
+                String.format(Locale.ROOT, "Unknown PipesMessageType wire 
byte: 0x%02x", b & 0xFF));
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
new file mode 100644
index 0000000000..fc89680095
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ProtocolDesyncException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.IOException;
+
+/**
+ * Thrown when the framing magic bytes do not match, indicating that the
+ * IPC stream is desynchronized and the connection is unsalvageable.
+ */
+public class ProtocolDesyncException extends IOException {
+
+    public ProtocolDesyncException(String message) {
+        super(message);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
new file mode 100644
index 0000000000..de9fedf957
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/protocol/ShutDownReceivedException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import java.io.IOException;
+
+/**
+ * Thrown when a SHUT_DOWN message is received where an ACK was expected.
+ * <p>
+ * This allows callers to distinguish a clean shutdown request from other
+ * I/O errors and respond with the appropriate lifecycle action (e.g.,
+ * {@code System.exit(0)} in PipesServer vs closing only the connection
+ * in ConnectionHandler).
+ */
+public class ShutDownReceivedException extends IOException {
+
+    public ShutDownReceivedException() {
+        super("Received SHUT_DOWN while awaiting ACK");
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
index e33d1f0cef..25e080a941 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ConnectionHandler.java
@@ -16,12 +16,6 @@
  */
 package org.apache.tika.pipes.core.server;
 
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.INTERMEDIATE_RESULT;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.Closeable;
@@ -32,7 +26,6 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HexFormat;
 import java.util.Locale;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -55,10 +48,10 @@ import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.core.EmitStrategyConfig;
 import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
 import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.serialization.ParseContextUtils;
-import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
 
 /**
  * Handles a single client connection in shared server mode.
@@ -67,9 +60,11 @@ import org.apache.tika.utils.StringUtils;
  * one PipesClient. It shares resources (parser, fetcher manager, etc.) with
  * other handlers but has its own socket, streams, and executor.
  * <p>
- * Unlike the per-client PipesServer, a ConnectionHandler does NOT call
- * System.exit() on errors - it just closes the connection and terminates
- * its thread. The shared server continues running for other clients.
+ * Unlike the per-client PipesServer, a ConnectionHandler does not call
+ * System.exit() for most errors — it just closes the connection and
+ * terminates its thread. However, OOM and TIMEOUT require a JVM restart,
+ * so those still call System.exit(). For all other crashes the shared
+ * server continues running for other clients.
  */
 public class ConnectionHandler implements Runnable, Closeable {
 
@@ -88,6 +83,7 @@ public class ConnectionHandler implements Runnable, Closeable 
{
     private final ExecutorCompletionService<PipesResult> 
executorCompletionService =
             new ExecutorCompletionService<>(executorService);
 
+    private final ServerProtocolIO protocolIO;
     private volatile boolean running = true;
 
     /**
@@ -107,14 +103,15 @@ public class ConnectionHandler implements Runnable, 
Closeable {
         this.resources = resources;
         this.pipesConfig = pipesConfig;
         this.heartbeatIntervalMs = pipesConfig.getHeartbeatIntervalMs();
+        this.protocolIO = new ServerProtocolIO(input, output);
     }
 
     @Override
     public void run() {
         LOG.debug("handlerId={}: starting connection handler", handlerId);
         try {
-            // Send READY signal
-            write(PipesServer.PROCESSING_STATUS.READY.getByte());
+            // Send READY signal (fire-and-forget, no ACK)
+            PipesMessage.ready().write(output);
             LOG.debug("handlerId={}: sent READY, entering main loop", 
handlerId);
 
             mainLoop();
@@ -134,58 +131,58 @@ public class ConnectionHandler implements Runnable, 
Closeable {
 
         while (running) {
             try {
-                int request = input.read();
-                LOG.trace("handlerId={}: received command byte={}", handlerId,
-                        HexFormat.of().formatHex(new byte[]{(byte) request}));
-
-                if (request == -1) {
-                    LOG.debug("handlerId={}: received -1 from client; closing 
connection", handlerId);
-                    return;
+                PipesMessage msg = PipesMessage.read(input);
+                LOG.trace("handlerId={}: received message type={}", handlerId, 
msg.type());
+
+                switch (msg.type()) {
+                    case PING:
+                        PipesMessage.ping().write(output);
+                        break;
+                    case NEW_REQUEST:
+                        intermediateResult.clear();
+                        CountDownLatch countDownLatch = new CountDownLatch(1);
+
+                        FetchEmitTuple fetchEmitTuple;
+                        try {
+                            fetchEmitTuple = 
JsonPipesIpc.fromBytes(msg.payload(), FetchEmitTuple.class);
+                        } catch (IOException e) {
+                            LOG.error("handlerId={}: problem deserializing 
FetchEmitTuple", handlerId, e);
+                            handleCrash(PipesMessageType.UNSPECIFIED_CRASH, 
"unknown", e);
+                            return; // connection is unsalvageable after 
deserialization failure
+                        }
+                        try {
+                            
ServerProtocolIO.validateFetchEmitTuple(fetchEmitTuple);
+                            ParseContext mergedContext = 
resources.createMergedParseContext(fetchEmitTuple.getParseContext());
+                            ParseContextUtils.resolveAll(mergedContext, 
getClass().getClassLoader());
+
+                            PipesWorker pipesWorker = 
createPipesWorker(intermediateResult, fetchEmitTuple,
+                                    mergedContext, countDownLatch);
+                            executorCompletionService.submit(pipesWorker);
+
+                            loopUntilDone(fetchEmitTuple, mergedContext, 
intermediateResult, countDownLatch);
+                        } catch (TikaConfigException e) {
+                            LOG.error("handlerId={}: config error processing 
request", handlerId, e);
+                            handleCrash(PipesMessageType.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), e);
+                        } catch (Throwable t) {
+                            LOG.error("handlerId={}: error processing 
request", handlerId, t);
+                        }
+                        break;
+                    case SHUT_DOWN:
+                        LOG.info("handlerId={}: received SHUT_DOWN, closing 
connection", handlerId);
+                        return;
+                    default:
+                        String errorMsg = String.format(Locale.ROOT,
+                                "handlerId=%d: Unexpected message type %s in 
command position",
+                                handlerId, msg.type());
+                        LOG.error(errorMsg);
+                        throw new IllegalStateException(errorMsg);
                 }
-
-                // Validate command byte
-                if (request == PipesClient.COMMANDS.ACK.getByte()) {
-                    String msg = String.format(Locale.ROOT,
-                            "handlerId=%d: PROTOCOL ERROR - Received ACK when 
expecting command", handlerId);
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
-                }
-
-                if (request == PipesClient.COMMANDS.PING.getByte()) {
-                    writeNoAck(PipesClient.COMMANDS.PING.getByte());
-                } else if (request == 
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
-                    intermediateResult.clear();
-                    CountDownLatch countDownLatch = new CountDownLatch(1);
-
-                    FetchEmitTuple fetchEmitTuple = readFetchEmitTuple();
-                    try {
-                        validateFetchEmitTuple(fetchEmitTuple);
-                        ParseContext mergedContext = 
resources.createMergedParseContext(fetchEmitTuple.getParseContext());
-                        ParseContextUtils.resolveAll(mergedContext, 
getClass().getClassLoader());
-
-                        PipesWorker pipesWorker = 
createPipesWorker(intermediateResult, fetchEmitTuple,
-                                mergedContext, countDownLatch);
-                        executorCompletionService.submit(pipesWorker);
-
-                        loopUntilDone(fetchEmitTuple, mergedContext, 
intermediateResult, countDownLatch);
-                    } catch (TikaConfigException e) {
-                        LOG.error("handlerId={}: config error processing 
request", handlerId, e);
-                        
handleCrash(PipesServer.PROCESSING_STATUS.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), e);
-                    } catch (Throwable t) {
-                        LOG.error("handlerId={}: error processing request", 
handlerId, t);
-                    }
-                } else if (request == 
PipesClient.COMMANDS.SHUT_DOWN.getByte()) {
-                    LOG.info("handlerId={}: received SHUT_DOWN, closing 
connection", handlerId);
-                    return;
-                } else {
-                    String msg = String.format(Locale.ROOT,
-                            "handlerId=%d: Unexpected byte 0x%02x in command 
position", handlerId, (byte) request);
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
-                }
-                output.flush();
+            } catch (java.io.EOFException e) {
+                // Client disconnected (stream closed)
+                LOG.debug("handlerId={}: client disconnected (EOF)", 
handlerId);
+                return;
             } catch (SocketException e) {
-                // Client disconnected
+                // Client disconnected (socket closed)
                 LOG.debug("handlerId={}: client disconnected", handlerId);
                 return;
             } catch (IOException e) {
@@ -216,7 +213,7 @@ public class ConnectionHandler implements Runnable, 
Closeable {
                                CountDownLatch countDownLatch) throws 
InterruptedException, IOException {
         Instant start = Instant.now();
         long timeoutMillis = PipesClient.getTimeoutMillis(pipesConfig, 
mergedContext);
-        long mockProgressCounter = 0;
+        long progressCounter = 1;
         boolean wroteIntermediateResult = false;
 
         while (running) {
@@ -224,7 +221,7 @@ public class ConnectionHandler implements Runnable, 
Closeable {
             if (!wroteIntermediateResult) {
                 Metadata intermediate = intermediateResult.poll(100, 
TimeUnit.MILLISECONDS);
                 if (intermediate != null) {
-                    writeIntermediate(intermediate);
+                    protocolIO.writeIntermediate(intermediate);
                     countDownLatch.countDown();
                     wroteIntermediateResult = true;
                 }
@@ -237,33 +234,31 @@ public class ConnectionHandler implements Runnable, 
Closeable {
                 try {
                     pipesResult = future.get();
                 } catch (OutOfMemoryError e) {
-                    handleCrash(OOM, fetchEmitTuple.getId(), e);
+                    handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(), 
e);
                     LOG.error("handlerId={}: exiting server due to OOM", 
handlerId);
-                    System.exit(1);
+                    System.exit(PipesMessageType.OOM.getExitCode().orElse(18));
                 } catch (ExecutionException e) {
                     Throwable t = e.getCause();
                     LOG.error("handlerId={}: crash processing {}", handlerId, 
fetchEmitTuple.getId(), t);
                     if (t instanceof OutOfMemoryError) {
-                        handleCrash(OOM, fetchEmitTuple.getId(), t);
+                        handleCrash(PipesMessageType.OOM, 
fetchEmitTuple.getId(), t);
                         LOG.error("handlerId={}: exiting server due to OOM", 
handlerId);
-                        System.exit(1);
+                        
System.exit(PipesMessageType.OOM.getExitCode().orElse(18));
                     }
-                    
handleCrash(PipesServer.PROCESSING_STATUS.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), t);
+                    handleCrash(PipesMessageType.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), t);
                     return;
                 }
                 LOG.debug("handlerId={}: finished task id={} status={}", 
handlerId,
                         fetchEmitTuple.getId(), pipesResult.status());
-                write(FINISHED, pipesResult);
+                protocolIO.writeFinished(pipesResult);
                 return;
             }
 
-            // Send heartbeat
+            // Send fire-and-forget heartbeat
             long elapsed = System.currentTimeMillis() - start.toEpochMilli();
-            if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
-                LOG.trace("handlerId={}: still processing, counter={}", 
handlerId, mockProgressCounter);
-                write(PipesServer.PROCESSING_STATUS.WORKING.getByte());
-                output.writeLong(mockProgressCounter++);
-                output.flush();
+            if (elapsed > progressCounter * heartbeatIntervalMs) {
+                LOG.trace("handlerId={}: still processing, counter={}", 
handlerId, progressCounter);
+                PipesMessage.working(progressCounter++).write(output);
             }
 
             // Check timeout
@@ -276,19 +271,17 @@ public class ConnectionHandler implements Runnable, 
Closeable {
 
     private void handleTimeout(String id) throws IOException {
         LOG.warn("handlerId={}: timeout processing id={}", handlerId, id);
-        write(TIMEOUT.getByte());
+        handleCrash(PipesMessageType.TIMEOUT, id,
+                new RuntimeException("Server-side timeout processing " + id));
         // Timeout means a parsing thread is stuck - the JVM must be restarted
         LOG.error("handlerId={}: exiting server due to timeout", handlerId);
-        System.exit(1);
+        System.exit(PipesMessageType.TIMEOUT.getExitCode().orElse(17));
     }
 
-    private void handleCrash(PipesServer.PROCESSING_STATUS processingStatus, 
String id, Throwable t) {
-        LOG.error("handlerId={}: {} processing id={}", handlerId, 
processingStatus, id, t);
-        String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+    private void handleCrash(PipesMessageType crashType, String id, Throwable 
t) {
+        LOG.error("handlerId={}: {} processing id={}", handlerId, crashType, 
id, t);
         try {
-            byte[] bytes = JsonPipesIpc.toBytes(msg);
-            write(processingStatus, bytes);
-            // Note: write() already awaits ACKs internally, don't call 
awaitAck() again
+            protocolIO.writeCrash(crashType, t);
         } catch (IOException e) {
             LOG.warn("handlerId={}: problem writing crash info to client", 
handlerId, e);
         }
@@ -296,96 +289,6 @@ public class ConnectionHandler implements Runnable, 
Closeable {
         // For other crashes (UNSPECIFIED_CRASH), we just close this connection
     }
 
-    private FetchEmitTuple readFetchEmitTuple() throws IOException {
-        int length = input.readInt();
-        if (length < 0 || length > PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES) {
-            throw new IOException("FetchEmitTuple length " + length +
-                    " exceeds maximum allowed size of " + 
PipesServer.MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
-        }
-        byte[] bytes = new byte[length];
-        input.readFully(bytes);
-        return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
-    }
-
-    private void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple) throws 
TikaConfigException {
-        ParseContext requestContext = fetchEmitTuple.getParseContext();
-        if (requestContext == null) {
-            return;
-        }
-        org.apache.tika.pipes.core.extractor.UnpackConfig unpackConfig =
-                
requestContext.get(org.apache.tika.pipes.core.extractor.UnpackConfig.class);
-        org.apache.tika.pipes.api.ParseMode parseMode =
-                requestContext.get(org.apache.tika.pipes.api.ParseMode.class);
-
-        if (unpackConfig != null && 
!StringUtils.isBlank(unpackConfig.getEmitter())
-                && parseMode != org.apache.tika.pipes.api.ParseMode.UNPACK) {
-            throw new TikaConfigException(
-                    "FetchEmitTuple has UnpackConfig with emitter '" + 
unpackConfig.getEmitter() +
-                            "' but ParseMode is " + parseMode + ". " +
-                            "To extract embedded bytes, set ParseMode.UNPACK 
in the ParseContext.");
-        }
-    }
-
-    private void write(PipesServer.PROCESSING_STATUS processingStatus, 
PipesResult pipesResult) {
-        try {
-            byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
-            write(processingStatus, bytes);
-        } catch (IOException e) {
-            LOG.error("handlerId={}: problem writing emit data", handlerId, e);
-        }
-    }
-
-    private void writeIntermediate(Metadata metadata) {
-        try {
-            byte[] bytes = JsonPipesIpc.toBytes(metadata);
-            write(INTERMEDIATE_RESULT, bytes);
-        } catch (IOException e) {
-            LOG.error("handlerId={}: problem writing intermediate data", 
handlerId, e);
-        }
-    }
-
-    private void awaitAck() throws IOException {
-        int b = input.read();
-        if (b == ACK.getByte()) {
-            return;
-        }
-        LOG.error("handlerId={}: expected ACK but got byte={}", handlerId,
-                HexFormat.of().formatHex(new byte[]{(byte) b}));
-        throw new IOException("Expected ACK but got byte=" + 
HexFormat.of().formatHex(new byte[]{(byte) b}));
-    }
-
-    private void writeNoAck(byte b) {
-        try {
-            output.write(b);
-            output.flush();
-        } catch (IOException e) {
-            LOG.error("handlerId={}: problem writing data", handlerId, e);
-        }
-    }
-
-    private void write(byte b) {
-        try {
-            output.write(b);
-            output.flush();
-            awaitAck();
-        } catch (IOException e) {
-            LOG.error("handlerId={}: problem writing data", handlerId, e);
-        }
-    }
-
-    private void write(PipesServer.PROCESSING_STATUS status, byte[] bytes) {
-        try {
-            write(status.getByte());
-            int len = bytes.length;
-            output.writeInt(len);
-            output.write(bytes);
-            output.flush();
-            awaitAck();
-        } catch (IOException e) {
-            LOG.error("handlerId={}: problem writing data", handlerId, e);
-        }
-    }
-
     @Override
     public void close() {
         running = false;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
index 66d0b7c333..e69137e5f7 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java
@@ -16,12 +16,6 @@
  */
 package org.apache.tika.pipes.core.server;
 
-import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.INTERMEDIATE_RESULT;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM;
-import static 
org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.TIMEOUT;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -64,7 +58,6 @@ import org.apache.tika.parser.AutoDetectParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.pipes.api.FetchEmitTuple;
-import org.apache.tika.pipes.api.ParseMode;
 import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.core.EmitStrategy;
 import org.apache.tika.pipes.core.EmitStrategyConfig;
@@ -73,16 +66,17 @@ import org.apache.tika.pipes.core.PipesConfig;
 import org.apache.tika.pipes.core.config.ConfigStore;
 import org.apache.tika.pipes.core.config.ConfigStoreFactory;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
-import org.apache.tika.pipes.core.extractor.UnpackConfig;
 import org.apache.tika.pipes.core.extractor.UnpackExtractorFactory;
 import org.apache.tika.pipes.core.fetcher.FetcherManager;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
+import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
 import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
 import org.apache.tika.plugins.ExtensionConfig;
 import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.serialization.ParseContextUtils;
 import org.apache.tika.utils.ExceptionUtils;
-import org.apache.tika.utils.StringUtils;
 
 /**
  * This server is forked from the PipesClient.  This class isolates
@@ -97,60 +91,14 @@ public class PipesServer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipesServer.class);
 
     public static final int AUTH_TOKEN_LENGTH_BYTES = 32;
-    public static final int MAX_FETCH_EMIT_TUPLE_BYTES = 100 * 1024 * 1024; // 
100MB
 
     private final long heartbeatIntervalMs;
     private final String pipesClientId;
 
-    //this has to be some number not close to 0-3
-    //it looks like the server crashes with exit value 3 on uncaught OOM, for 
example
-    public static final int TIMEOUT_EXIT_CODE = 17;
-    public static final int OOM_EXIT_CODE = 18;
-    public static final int UNSPECIFIED_CRASH_EXIT_CODE = 19;
-
-
-    public enum PROCESSING_STATUS {
-        READY, INTERMEDIATE_RESULT, WORKING, FINISHED,
-        OOM(OOM_EXIT_CODE), TIMEOUT(TIMEOUT_EXIT_CODE), 
UNSPECIFIED_CRASH(UNSPECIFIED_CRASH_EXIT_CODE);
-
-        int exitCode = -1;
-        public static PROCESSING_STATUS lookup(int b) {
-            if (b < 1) {
-                throw new IllegalArgumentException("bad result value: " + b);
-            }
-            int ordinal = b - 1;
-            if (ordinal >= PROCESSING_STATUS.values().length) {
-                throw new IllegalArgumentException("ordinal > than array 
length? " + ordinal);
-            }
-            return PROCESSING_STATUS.values()[ordinal];
-        }
-        PROCESSING_STATUS() {
-
-        }
-
-        PROCESSING_STATUS(int exitCode) {
-            this.exitCode = exitCode;
-        }
-
-        public int getExitCode() {
-            return exitCode;
-        }
-
-        public byte getByte() {
-            return (byte) (ordinal() + 1);
-        }
-    }
-
     private Detector detector;
 
-
-
-    private final Object[] lock = new Object[0];
     private final DataInputStream input;
     private final DataOutputStream output;
-    //if an extract is larger than this value, emit it directly;
-    //if it is smaller than this value, write it back to the
-    //PipesClient so that it can cache the extracts and then batch emit.
 
     private final TikaLoader tikaLoader;
     private final PipesConfig pipesConfig;
@@ -166,6 +114,7 @@ public class PipesServer implements AutoCloseable {
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
     private final ExecutorCompletionService<PipesResult> 
executorCompletionService = new ExecutorCompletionService<>(executorService);
     private final EmitStrategy emitStrategy;
+    private final ServerProtocolIO protocolIO;
 
     public static PipesServer load(int port, Path tikaConfigPath) throws 
Exception {
             String pipesClientId = System.getProperty("pipesClientId", 
"unknown");
@@ -195,23 +144,12 @@ public class PipesServer implements AutoCloseable {
         } catch (Exception e) {
             LOG.error("Failed to start up", e);
             try {
-                // Write FINISHED status byte and await ACK
-                dos.writeByte(FINISHED.getByte());
-                dos.flush();
-                int ack = dis.read();
-                if (ack != PipesClient.COMMANDS.ACK.getByte()) {
-                    LOG.warn("Expected ACK but got: {}", ack);
-                }
-
-                // Write error message and await ACK
                 String msg = ExceptionUtils.getStackTrace(e);
                 byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
-                dos.writeInt(bytes.length);
-                dos.write(bytes);
-                dos.flush();
-                ack = dis.read();
-                if (ack != PipesClient.COMMANDS.ACK.getByte()) {
-                    LOG.warn("Expected ACK but got: {}", ack);
+                PipesMessage.startupFailed(bytes).write(dos);
+                PipesMessage ackMsg = PipesMessage.read(dis);
+                if (ackMsg.type() != PipesMessageType.ACK) {
+                    LOG.warn("Expected ACK but got: {}", ackMsg.type());
                 }
             } catch (IOException ioException) {
                 LOG.error("Failed to send startup failure message to client", 
ioException);
@@ -250,6 +188,7 @@ public class PipesServer implements AutoCloseable {
         }
 
         emitStrategy = pipesConfig.getEmitStrategy().getType();
+        this.protocolIO = new ServerProtocolIO(input, output);
     }
 
 
@@ -365,84 +304,73 @@ public class PipesServer implements AutoCloseable {
     }
 
     public void mainLoop() {
-        write(PROCESSING_STATUS.READY.getByte());
+        try {
+            PipesMessage.ready().write(output);
+        } catch (IOException e) {
+            LOG.error("pipesClientId={}: failed to send READY", pipesClientId, 
e);
+            exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
+            return;
+        }
         LOG.debug("pipesClientId={}: sent READY, entering main loop", 
pipesClientId);
         ArrayBlockingQueue<Metadata> intermediateResult = new 
ArrayBlockingQueue<>(1);
 
         //main loop
         try {
-            long start = System.currentTimeMillis();
             while (true) {
-                int request = input.read();
-                LOG.trace("pipesClientId={}: received command byte={}", 
pipesClientId, HexFormat.of().formatHex(new byte[]{(byte)request}));
-                if (request == -1) {
-                    LOG.debug("received -1 from client; shutting down");
-                    exit(0);
-                }
-
-                // Validate that we received a command byte, not a status/ACK 
byte
-                if (request == PipesClient.COMMANDS.ACK.getByte()) {
-                    String msg = String.format(Locale.ROOT,
-                            "pipesClientId=%s: PROTOCOL ERROR - Received ACK 
(byte=0x%02x) when expecting a command. " +
-                            "This indicates a protocol synchronization issue 
where the server missed consuming an ACK. " +
-                            "Valid commands are: PING(0x%02x), 
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x). " +
-                            "This is likely a bug in the server's message 
handling - check that all status messages " +
-                            "that trigger client ACKs are properly awaiting 
those ACKs.",
-                            pipesClientId, (byte)request,
-                            PipesClient.COMMANDS.PING.getByte(),
-                            PipesClient.COMMANDS.NEW_REQUEST.getByte(),
-                            PipesClient.COMMANDS.SHUT_DOWN.getByte());
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
-                }
-
-                if (request == PipesClient.COMMANDS.PING.getByte()) {
-                    writeNoAck(PipesClient.COMMANDS.PING.getByte());
-                } else if (request == 
PipesClient.COMMANDS.NEW_REQUEST.getByte()) {
-                    intermediateResult.clear();
-                    CountDownLatch countDownLatch = new CountDownLatch(1);
-
-                    FetchEmitTuple fetchEmitTuple = readFetchEmitTuple();
-                    // Validate before merging with global config
-                    validateFetchEmitTuple(fetchEmitTuple);
-                    // Create merged ParseContext: defaults from tika-config + 
request overrides
-                    ParseContext mergedContext = 
createMergedParseContext(fetchEmitTuple.getParseContext());
-                    // Resolve friendly-named configs in ParseContext to 
actual objects
-                    ParseContextUtils.resolveAll(mergedContext, 
getClass().getClassLoader());
-
-                    PipesWorker pipesWorker = 
getPipesWorker(intermediateResult, fetchEmitTuple, mergedContext, 
countDownLatch);
-                    executorCompletionService.submit(pipesWorker);
-                    //set progress counter
-                    try {
-                        loopUntilDone(fetchEmitTuple, mergedContext, 
executorCompletionService, intermediateResult, countDownLatch);
-                    } catch (Throwable t) {
-                        LOG.error("Serious problem: {}", 
HexFormat.of().formatHex(new byte[]{(byte)request}), t);
-                    }
-                } else if (request == 
PipesClient.COMMANDS.SHUT_DOWN.getByte()) {
-                    LOG.debug("shutting down");
-                    try {
-                        close();
-                    } catch (Exception e) {
-                        //swallow
-                    }
-                    System.exit(0);
-                } else {
-                    String msg = String.format(Locale.ROOT,
-                            "pipesClientId=%s: Unexpected byte 0x%02x in 
command position. " +
-                            "Expected one of: PING(0x%02x), ACK(0x%02x), 
NEW_REQUEST(0x%02x), SHUT_DOWN(0x%02x)",
-                            pipesClientId, (byte)request,
-                            PipesClient.COMMANDS.PING.getByte(),
-                            PipesClient.COMMANDS.ACK.getByte(),
-                            PipesClient.COMMANDS.NEW_REQUEST.getByte(),
-                            PipesClient.COMMANDS.SHUT_DOWN.getByte());
-                    LOG.error(msg);
-                    throw new IllegalStateException(msg);
+                PipesMessage msg = PipesMessage.read(input);
+                LOG.trace("pipesClientId={}: received message type={}", 
pipesClientId, msg.type());
+
+                switch (msg.type()) {
+                    case PING:
+                        PipesMessage.ping().write(output);
+                        break;
+                    case NEW_REQUEST:
+                        intermediateResult.clear();
+                        CountDownLatch countDownLatch = new CountDownLatch(1);
+
+                        FetchEmitTuple fetchEmitTuple;
+                        try {
+                            fetchEmitTuple = 
JsonPipesIpc.fromBytes(msg.payload(), FetchEmitTuple.class);
+                        } catch (IOException e) {
+                            LOG.error("problem deserializing FetchEmitTuple", 
e);
+                            handleCrash(PipesMessageType.UNSPECIFIED_CRASH, 
"unknown", e);
+                            break; // unreachable after handleCrash/exit, but 
needed for compilation
+                        }
+                        // Validate before merging with global config
+                        
ServerProtocolIO.validateFetchEmitTuple(fetchEmitTuple);
+                        // Create merged ParseContext: defaults from 
tika-config + request overrides
+                        ParseContext mergedContext = 
createMergedParseContext(fetchEmitTuple.getParseContext());
+                        // Resolve friendly-named configs in ParseContext to 
actual objects
+                        ParseContextUtils.resolveAll(mergedContext, 
getClass().getClassLoader());
+
+                        PipesWorker pipesWorker = 
getPipesWorker(intermediateResult, fetchEmitTuple, mergedContext, 
countDownLatch);
+                        executorCompletionService.submit(pipesWorker);
+                        try {
+                            loopUntilDone(fetchEmitTuple, mergedContext, 
executorCompletionService, intermediateResult, countDownLatch);
+                        } catch (Throwable t) {
+                            LOG.error("Serious problem processing request", t);
+                        }
+                        break;
+                    case SHUT_DOWN:
+                        LOG.debug("shutting down");
+                        try {
+                            close();
+                        } catch (Exception e) {
+                            //swallow
+                        }
+                        System.exit(0);
+                        break;
+                    default:
+                        String errorMsg = String.format(Locale.ROOT,
+                                "pipesClientId=%s: Unexpected message type %s 
in command position",
+                                pipesClientId, msg.type());
+                        LOG.error(errorMsg);
+                        throw new IllegalStateException(errorMsg);
                 }
-                output.flush();
             }
         } catch (Throwable t) {
             LOG.error("main loop error (did the forking process shut down?)", 
t);
-            exit(1);
+            exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
         }
     }
 
@@ -463,7 +391,7 @@ public class PipesServer implements AutoCloseable {
                                ArrayBlockingQueue<Metadata> 
intermediateResult, CountDownLatch countDownLatch) throws InterruptedException, 
IOException {
         Instant start = Instant.now();
         long timeoutMillis = PipesClient.getTimeoutMillis(pipesConfig, 
mergedContext);
-        long mockProgressCounter = 0;
+        long progressCounter = 1;
         boolean wroteIntermediateResult = false;
 
         while (true) {
@@ -484,54 +412,54 @@ public class PipesServer implements AutoCloseable {
                 try {
                     pipesResult = future.get();
                 } catch (OutOfMemoryError e) {
-                    handleCrash(OOM, fetchEmitTuple.getId(), e);
+                    handleCrash(PipesMessageType.OOM, fetchEmitTuple.getId(), 
e);
+                    return; // handleCrash calls exit(), but guard against 
unexpected return
                 } catch (ExecutionException e) {
                     Throwable t = e.getCause();
                     LOG.error("crash: {}", fetchEmitTuple.getId(), t);
                     if (t instanceof OutOfMemoryError) {
-                        handleCrash(OOM, fetchEmitTuple.getId(), t);
+                        handleCrash(PipesMessageType.OOM, 
fetchEmitTuple.getId(), t);
+                        return;
                     }
-                    handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), t);
+                    handleCrash(PipesMessageType.UNSPECIFIED_CRASH, 
fetchEmitTuple.getId(), t);
+                    return;
                 }
                 LOG.debug("executor completionService finished task: id={} 
status={}", fetchEmitTuple.getId(), pipesResult.status());
-                write(FINISHED, pipesResult);
+                writeFinished(pipesResult);
                 return;
             }
 
-            // Send heartbeat if we've waited long enough
+            // Send fire-and-forget heartbeat if we've waited long enough
             long elapsed = System.currentTimeMillis() - start.toEpochMilli();
-            if (elapsed > mockProgressCounter * heartbeatIntervalMs) {
-                LOG.debug("still processing: {}", mockProgressCounter);
-                write(PROCESSING_STATUS.WORKING.getByte());
-                output.writeLong(mockProgressCounter++);
-                output.flush();
+            if (elapsed > progressCounter * heartbeatIntervalMs) {
+                LOG.debug("still processing: {}", progressCounter);
+                PipesMessage.working(progressCounter++).write(output);
             }
 
-            checkTimeout(start, timeoutMillis);
+            if (checkTimeout(start, timeoutMillis, fetchEmitTuple.getId())) {
+                return; // handleCrash calls exit(), but guard against 
unexpected return
+            }
         }
 
     }
 
-    private void checkTimeout(Instant start, long timeoutMillis) throws 
IOException {
-
+    private boolean checkTimeout(Instant start, long timeoutMillis, String id) 
{
         if (Duration.between(start, Instant.now()).toMillis() > timeoutMillis) 
{
-            write(TIMEOUT.getByte());
-            exit(TIMEOUT_EXIT_CODE);
+            handleCrash(PipesMessageType.TIMEOUT, id,
+                    new RuntimeException("Server-side timeout after " + 
timeoutMillis + "ms"));
+            return true;
         }
+        return false;
     }
 
-    private void handleCrash(PROCESSING_STATUS processingStatus, String id, 
Throwable t) {
-        LOG.error("{}: {}", processingStatus, id, t);
-        String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+    private void handleCrash(PipesMessageType crashType, String id, Throwable 
t) {
+        LOG.error("{}: {}", crashType, id, t);
         try {
-            byte[] bytes = JsonPipesIpc.toBytes(msg);
-            write(processingStatus, bytes);
-            awaitAck();
+            protocolIO.writeCrash(crashType, t);
         } catch (IOException e) {
-            //swallow
             LOG.warn("problem writing crash info to client", e);
         }
-        exit(processingStatus.getExitCode());
+        exit(crashType.getExitCode().orElse(19));
     }
 
 
@@ -550,48 +478,6 @@ public class PipesServer implements AutoCloseable {
         System.exit(exitCode);
     }
 
-
-    private FetchEmitTuple readFetchEmitTuple() {
-        try {
-            int length = input.readInt();
-            if (length < 0 || length > MAX_FETCH_EMIT_TUPLE_BYTES) {
-                throw new IOException("FetchEmitTuple length " + length +
-                        " exceeds maximum allowed size of " + 
MAX_FETCH_EMIT_TUPLE_BYTES + " bytes");
-            }
-            byte[] bytes = new byte[length];
-            input.readFully(bytes);
-            return JsonPipesIpc.fromBytes(bytes, FetchEmitTuple.class);
-        } catch (IOException e) {
-            LOG.error("problem reading/deserializing FetchEmitTuple", e);
-            handleCrash(PROCESSING_STATUS.UNSPECIFIED_CRASH, "unknown", e);
-        }
-        //unreachable - handleCrash calls exit
-        return null;
-    }
-
-    /**
-     * Validates the FetchEmitTuple before merging with global config.
-     * If the tuple explicitly sets UnpackConfig with an emitter but ParseMode 
is not UNPACK,
-     * that's a configuration error.
-     */
-    private void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple) throws 
TikaConfigException {
-        ParseContext requestContext = fetchEmitTuple.getParseContext();
-        if (requestContext == null) {
-            return;
-        }
-        UnpackConfig unpackConfig = requestContext.get(UnpackConfig.class);
-        ParseMode parseMode = requestContext.get(ParseMode.class);
-
-        // If tuple explicitly has UnpackConfig with emitter but not UNPACK 
mode, that's an error
-        if (unpackConfig != null && 
!StringUtils.isBlank(unpackConfig.getEmitter())
-                && parseMode != ParseMode.UNPACK) {
-            throw new TikaConfigException(
-                    "FetchEmitTuple has UnpackConfig with emitter '" + 
unpackConfig.getEmitter() +
-                    "' but ParseMode is " + parseMode + ". " +
-                    "To extract embedded bytes, set ParseMode.UNPACK in the 
ParseContext.");
-        }
-    }
-
     protected void initializeResources() throws TikaException, IOException, 
SAXException {
 
         TikaJsonConfig tikaJsonConfig = tikaLoader.getConfig();
@@ -634,84 +520,50 @@ public class PipesServer implements AutoCloseable {
     private ConfigStore createConfigStore(PipesConfig pipesConfig, 
TikaPluginManager tikaPluginManager) throws TikaException {
         String configStoreType = pipesConfig.getConfigStoreType();
         String configStoreParams = pipesConfig.getConfigStoreParams();
-        
+
         if (configStoreType == null || "memory".equals(configStoreType)) {
             // Use default in-memory store (no persistence)
             return null;
         }
-        
+
         ExtensionConfig storeConfig = new ExtensionConfig(
             configStoreType, configStoreType, configStoreParams);
-        
+
         return ConfigStoreFactory.createConfigStore(
                 tikaPluginManager,
                 configStoreType,
                 storeConfig);
     }
 
-
-    private void write(PROCESSING_STATUS processingStatus, PipesResult 
pipesResult) {
+    private void writeFinished(PipesResult pipesResult) {
         try {
-            byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
-            write(processingStatus, bytes);
+            protocolIO.writeFinished(pipesResult);
+        } catch (ShutDownReceivedException e) {
+            handleShutDown();
         } catch (IOException e) {
             LOG.error("problem writing emit data (forking process shutdown?)", 
e);
-            exit(1);
+            exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
         }
     }
 
     private void writeIntermediate(Metadata metadata) {
         try {
-            byte[] bytes = JsonPipesIpc.toBytes(metadata);
-            write(INTERMEDIATE_RESULT, bytes);
+            protocolIO.writeIntermediate(metadata);
+        } catch (ShutDownReceivedException e) {
+            handleShutDown();
         } catch (IOException e) {
             LOG.error("problem writing intermediate data (forking process 
shutdown?)", e);
-            exit(1);
-        }
-    }
-
-    private void awaitAck() throws IOException {
-        int b = input.read();
-        if (b == ACK.getByte()) {
-            return;
-        }
-        LOG.error("pipesClientId={}: expected ACK but got byte={}", 
pipesClientId, HexFormat.of().formatHex(new byte[]{ (byte) b}));
-        throw new IOException("Wasn't expecting byte=" + 
HexFormat.of().formatHex(new byte[]{ (byte) b}));
-    }
-
-    private void writeNoAck(byte b) {
-        try {
-            output.write(b);
-            output.flush();
-        } catch (IOException e) {
-            LOG.error("problem writing data (forking process shutdown?)", e);
-            exit(1);
+            exit(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().orElse(19));
         }
     }
 
-    private void write(byte b) {
+    private void handleShutDown() {
+        LOG.info("pipesClientId={}: received SHUT_DOWN, shutting down 
gracefully", pipesClientId);
         try {
-            output.write(b);
-            output.flush();
-            awaitAck();
-        } catch (IOException e) {
-            LOG.error("pipesClientId={}: problem writing data (forking process 
shutdown?)", pipesClientId, e);
-            exit(1);
-        }
-    }
-
-
-    private void write(PROCESSING_STATUS status, byte[] bytes) {
-        try {
-            write(status.getByte());
-            int len = bytes.length;
-            output.writeInt(len);
-            output.write(bytes);
-            output.flush();
-            awaitAck();
-        } catch (IOException e) {
-            LOG.error("problem writing data (forking process shutdown?)", e);
-            exit(1);
+            close();
+        } catch (Exception e) {
+            //swallow
         }
+        exit(0);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
new file mode 100644
index 0000000000..3d71f87457
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/ServerProtocolIO.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.server;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.FetchEmitTuple;
+import org.apache.tika.pipes.api.ParseMode;
+import org.apache.tika.pipes.api.PipesResult;
+import org.apache.tika.pipes.core.extractor.UnpackConfig;
+import org.apache.tika.pipes.core.protocol.PipesMessage;
+import org.apache.tika.pipes.core.protocol.PipesMessageType;
+import org.apache.tika.pipes.core.protocol.ShutDownReceivedException;
+import org.apache.tika.pipes.core.serialization.JsonPipesIpc;
+import org.apache.tika.utils.ExceptionUtils;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * Centralizes protocol I/O operations shared by {@link PipesServer} and
+ * {@link ConnectionHandler}.
+ * <p>
+ * This class handles the pure protocol mechanics — serialization, framing,
+ * and ACK exchange. It does <b>not</b> make lifecycle decisions (exit vs.
+ * return, close connection vs. shut down JVM). Callers are responsible for
+ * catching exceptions and responding according to their own lifecycle policy.
+ */
+public class ServerProtocolIO {
+
+    private final DataInputStream input;
+    private final DataOutputStream output;
+
+    public ServerProtocolIO(DataInputStream input, DataOutputStream output) {
+        this.input = input;
+        this.output = output;
+    }
+
+    /**
+     * Writes a FINISHED message with the serialized result and waits for ACK.
+     *
+     * @throws ShutDownReceivedException if SHUT_DOWN is received instead of 
ACK
+     * @throws IOException on serialization or I/O errors
+     */
+    public void writeFinished(PipesResult pipesResult) throws IOException {
+        byte[] bytes = JsonPipesIpc.toBytes(pipesResult);
+        PipesMessage.finished(bytes).write(output);
+        awaitAck();
+    }
+
+    /**
+     * Writes an INTERMEDIATE_RESULT message with the serialized metadata and 
waits for ACK.
+     *
+     * @throws ShutDownReceivedException if SHUT_DOWN is received instead of 
ACK
+     * @throws IOException on serialization or I/O errors
+     */
+    public void writeIntermediate(Metadata metadata) throws IOException {
+        byte[] bytes = JsonPipesIpc.toBytes(metadata);
+        PipesMessage.intermediateResult(bytes).write(output);
+        awaitAck();
+    }
+
+    /**
+     * Writes a crash message (OOM, TIMEOUT, or UNSPECIFIED_CRASH) with the
+     * serialized stack trace and waits for ACK.
+     *
+     * @throws IOException on serialization, I/O, or unexpected ACK response
+     */
+    public void writeCrash(PipesMessageType crashType, Throwable t) throws 
IOException {
+        String msg = (t != null) ? ExceptionUtils.getStackTrace(t) : "";
+        byte[] bytes = JsonPipesIpc.toBytes(msg);
+        PipesMessage.crash(crashType, bytes).write(output);
+        awaitAck();
+    }
+
+    /**
+     * Reads a framed message and verifies it is an ACK.
+     *
+     * @throws ShutDownReceivedException if the message is SHUT_DOWN
+     * @throws IOException if the message is any other non-ACK type, or on I/O 
error
+     */
+    public void awaitAck() throws IOException {
+        PipesMessage msg = PipesMessage.read(input);
+        if (msg.type() == PipesMessageType.ACK) {
+            return;
+        }
+        if (msg.type() == PipesMessageType.SHUT_DOWN) {
+            throw new ShutDownReceivedException();
+        }
+        throw new IOException("Expected ACK but got " + msg.type());
+    }
+
+    /**
+     * Validates that a FetchEmitTuple's configuration is consistent.
+     * <p>
+     * If the tuple has an UnpackConfig with an emitter but ParseMode is not 
UNPACK,
+     * that's a configuration error.
+     */
+    public static void validateFetchEmitTuple(FetchEmitTuple fetchEmitTuple)
+            throws TikaConfigException {
+        ParseContext requestContext = fetchEmitTuple.getParseContext();
+        if (requestContext == null) {
+            return;
+        }
+        UnpackConfig unpackConfig = requestContext.get(UnpackConfig.class);
+        ParseMode parseMode = requestContext.get(ParseMode.class);
+
+        if (unpackConfig != null && 
!StringUtils.isBlank(unpackConfig.getEmitter())
+                && parseMode != ParseMode.UNPACK) {
+            throw new TikaConfigException(
+                    "FetchEmitTuple has UnpackConfig with emitter '" + 
unpackConfig.getEmitter() +
+                            "' but ParseMode is " + parseMode + ". " +
+                            "To extract embedded bytes, set ParseMode.UNPACK 
in the ParseContext.");
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
new file mode 100644
index 0000000000..372cf49bee
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/protocol/PipesMessageTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+class PipesMessageTest {
+
+    @Test
+    void testRoundTripEmptyPayload() throws IOException {
+        for (PipesMessageType type : new PipesMessageType[]{
+                PipesMessageType.PING, PipesMessageType.ACK,
+                PipesMessageType.READY, PipesMessageType.SHUT_DOWN}) {
+            PipesMessage original = new PipesMessage(type, new byte[0]);
+            PipesMessage roundTripped = roundTrip(original);
+            assertEquals(type, roundTripped.type());
+            assertEquals(0, roundTripped.payload().length);
+        }
+    }
+
+    @Test
+    void testRoundTripWithPayload() throws IOException {
+        byte[] payload = "hello world".getBytes(StandardCharsets.UTF_8);
+        PipesMessage original = PipesMessage.finished(payload);
+        PipesMessage roundTripped = roundTrip(original);
+        assertEquals(PipesMessageType.FINISHED, roundTripped.type());
+        assertArrayEquals(payload, roundTripped.payload());
+    }
+
+    @Test
+    void testRoundTripAllTypes() throws IOException {
+        byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+        for (PipesMessageType type : PipesMessageType.values()) {
+            PipesMessage original = new PipesMessage(type, payload);
+            PipesMessage roundTripped = roundTrip(original);
+            assertEquals(type, roundTripped.type());
+            assertArrayEquals(payload, roundTripped.payload());
+        }
+    }
+
+    @Test
+    void testWorkingMessageRoundTrip() throws IOException {
+        PipesMessage original = PipesMessage.working(42L);
+        PipesMessage roundTripped = roundTrip(original);
+        assertEquals(PipesMessageType.WORKING, roundTripped.type());
+        assertEquals(42L, roundTripped.progressCounter());
+    }
+
+    @Test
+    void testDesyncDetectionBadMagic() {
+        byte[] bad = new byte[]{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00};
+        assertThrows(ProtocolDesyncException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(bad))));
+    }
+
+    @Test
+    void testDesyncDetectionPartialMagic() {
+        // First byte correct, second wrong
+        byte[] bad = new byte[]{0x54, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00};
+        assertThrows(ProtocolDesyncException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(bad))));
+    }
+
+    @Test
+    void testEofBeforeMagic() {
+        byte[] empty = new byte[0];
+        assertThrows(EOFException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(empty))));
+    }
+
+    @Test
+    void testEofAfterFirstMagicByte() {
+        byte[] partial = new byte[]{0x54};
+        assertThrows(EOFException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(partial))));
+    }
+
+    @Test
+    void testNegativePayloadLength() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.write(PipesMessage.MAGIC_0);
+        dos.write(PipesMessage.MAGIC_1);
+        dos.write(PipesMessageType.FINISHED.getByte());
+        dos.writeInt(-1); // negative length
+        dos.flush();
+
+        assertThrows(IOException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()))));
+    }
+
+    @Test
+    void testOversizedPayloadRejection() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.write(PipesMessage.MAGIC_0);
+        dos.write(PipesMessage.MAGIC_1);
+        dos.write(PipesMessageType.FINISHED.getByte());
+        dos.writeInt(PipesMessage.MAX_PAYLOAD_BYTES + 1);
+        dos.flush();
+
+        assertThrows(IOException.class, () ->
+                PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()))));
+    }
+
+    @Test
+    void testRequiresAckAssertions() {
+        assertFalse(PipesMessageType.PING.requiresAck());
+        assertFalse(PipesMessageType.ACK.requiresAck());
+        assertFalse(PipesMessageType.NEW_REQUEST.requiresAck());
+        assertFalse(PipesMessageType.SHUT_DOWN.requiresAck());
+        assertFalse(PipesMessageType.READY.requiresAck());
+        assertFalse(PipesMessageType.WORKING.requiresAck());
+
+        assertTrue(PipesMessageType.STARTUP_FAILED.requiresAck());
+        assertTrue(PipesMessageType.INTERMEDIATE_RESULT.requiresAck());
+        assertTrue(PipesMessageType.FINISHED.requiresAck());
+        assertTrue(PipesMessageType.OOM.requiresAck());
+        assertTrue(PipesMessageType.TIMEOUT.requiresAck());
+        assertTrue(PipesMessageType.UNSPECIFIED_CRASH.requiresAck());
+    }
+
+    @Test
+    void testGetByteAndLookupInverse() {
+        for (PipesMessageType type : PipesMessageType.values()) {
+            byte b = type.getByte();
+            PipesMessageType looked = PipesMessageType.lookup(b);
+            assertEquals(type, looked, "lookup(getByte()) failed for " + type);
+        }
+    }
+
+    @Test
+    void testLookupUnknownByte() {
+        assertThrows(IllegalArgumentException.class, () -> 
PipesMessageType.lookup(0xFF));
+        assertThrows(IllegalArgumentException.class, () -> 
PipesMessageType.lookup(0x00));
+    }
+
+    @Test
+    void testExitCodes() {
+        assertTrue(PipesMessageType.OOM.getExitCode().isPresent());
+        assertEquals(18, PipesMessageType.OOM.getExitCode().getAsInt());
+
+        assertTrue(PipesMessageType.TIMEOUT.getExitCode().isPresent());
+        assertEquals(17, PipesMessageType.TIMEOUT.getExitCode().getAsInt());
+
+        
assertTrue(PipesMessageType.UNSPECIFIED_CRASH.getExitCode().isPresent());
+        assertEquals(19, 
PipesMessageType.UNSPECIFIED_CRASH.getExitCode().getAsInt());
+
+        assertFalse(PipesMessageType.PING.getExitCode().isPresent());
+        assertFalse(PipesMessageType.FINISHED.getExitCode().isPresent());
+        assertFalse(PipesMessageType.READY.getExitCode().isPresent());
+    }
+
+    @Test
+    void testConvenienceFactories() throws IOException {
+        assertEquals(PipesMessageType.PING, 
roundTrip(PipesMessage.ping()).type());
+        assertEquals(PipesMessageType.ACK, 
roundTrip(PipesMessage.ack()).type());
+        assertEquals(PipesMessageType.READY, 
roundTrip(PipesMessage.ready()).type());
+        assertEquals(PipesMessageType.SHUT_DOWN, 
roundTrip(PipesMessage.shutDown()).type());
+
+        byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+        assertEquals(PipesMessageType.NEW_REQUEST, 
roundTrip(PipesMessage.newRequest(data)).type());
+        assertEquals(PipesMessageType.FINISHED, 
roundTrip(PipesMessage.finished(data)).type());
+        assertEquals(PipesMessageType.INTERMEDIATE_RESULT, 
roundTrip(PipesMessage.intermediateResult(data)).type());
+        assertEquals(PipesMessageType.STARTUP_FAILED, 
roundTrip(PipesMessage.startupFailed(data)).type());
+        assertEquals(PipesMessageType.OOM, 
roundTrip(PipesMessage.crash(PipesMessageType.OOM, data)).type());
+    }
+
+    private PipesMessage roundTrip(PipesMessage msg) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        msg.write(new DataOutputStream(baos));
+        return PipesMessage.read(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray())));
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
index 5cef444687..a84edf5275 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/core/PipesClientTest.java
@@ -294,7 +294,8 @@ public class PipesClientTest {
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message");
             assertTrue(pipesResult.message().contains("exit code") ||
                             pipesResult.message().contains("JVM") ||
-                            pipesResult.message().contains("Process failed"),
+                            pipesResult.message().contains("Process failed") ||
+                            pipesResult.message().contains("couldn't connect 
to server"),
                     "Error message should indicate process failure: " + 
pipesResult.message());
         }
     }
@@ -353,7 +354,9 @@ public class PipesClientTest {
             // Should have error message about the crash
             Assertions.assertNotNull(pipesResult.message(), "Should have error 
message");
             assertTrue(pipesResult.message().contains("problem reading 
response") |
-                    pipesResult.message().contains("SocketException"),
+                    pipesResult.message().contains("SocketException") |
+                    pipesResult.message().contains("EOFException") |
+                    pipesResult.message().contains("Stream closed"),
                     "Error message should mention the detection crash: " + 
pipesResult.message());
 
             // Note: Because crash happens during pre-parse (before 
intermediate result is sent),
@@ -779,6 +782,60 @@ public class PipesClientTest {
                 "User filter should take priority over CONTENT_ONLY filter");
     }
 
+    @Test
+    public void testRecoveryAfterServerCrash(@TempDir Path tmp) throws 
Exception {
+        // Test that after a server crash (System.exit), the client can recover
+        // and successfully process the next document.
+        // This exercises the full crash → restart → reconnect path.
+        Path inputDir = tmp.resolve("input");
+        Files.createDirectories(inputDir);
+
+        // Create a mock file that will crash the server
+        String crashContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + 
"<mock>" +
+                "<metadata action=\"add\" name=\"dc:creator\">Crash 
Test</metadata>" +
+                "<write element=\"p\">content before crash</write>" +
+                "<system_exit/>" + "</mock>";
+        String crashFile = "mock-crash.xml";
+        Files.write(inputDir.resolve(crashFile), 
crashContent.getBytes(StandardCharsets.UTF_8));
+
+        // Create a normal mock file
+        String normalContent = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + 
"<mock>" +
+                "<metadata action=\"add\" name=\"dc:creator\">Normal 
Author</metadata>" +
+                "<write element=\"p\">normal content</write>" +
+                "</mock>";
+        String normalFile = "mock-normal.xml";
+        Files.write(inputDir.resolve(normalFile), 
normalContent.getBytes(StandardCharsets.UTF_8));
+
+        Path tikaConfigPath = 
PluginsTestHelper.getFileSystemFetcherConfig(tmp, inputDir, 
tmp.resolve("output"));
+        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfigPath);
+        PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig);
+
+        try (PipesClient pipesClient = new PipesClient(pipesConfig, 
tikaConfigPath)) {
+            // First: process the crashing file — server should die
+            PipesResult crashResult = pipesClient.process(
+                    new FetchEmitTuple(crashFile, new FetchKey(fetcherName, 
crashFile),
+                            new EmitKey(), new Metadata(), new ParseContext(),
+                            FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+            assertTrue(crashResult.isProcessCrash(),
+                    "Crash file should result in process crash, got: " + 
crashResult.status());
+
+            // Second: process the normal file — client should restart server 
and succeed
+            PipesResult normalResult = pipesClient.process(
+                    new FetchEmitTuple(normalFile, new FetchKey(fetcherName, 
normalFile),
+                            new EmitKey(), new Metadata(), new ParseContext(),
+                            FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+
+            assertTrue(normalResult.isSuccess(),
+                    "Normal file should succeed after crash recovery, got: " + 
normalResult.status() +
+                            " message: " + normalResult.message());
+            
Assertions.assertNotNull(normalResult.emitData().getMetadataList());
+            assertEquals(1, normalResult.emitData().getMetadataList().size());
+            Metadata metadata = 
normalResult.emitData().getMetadataList().get(0);
+            assertEquals("Normal Author", metadata.get("dc:creator"));
+        }
+    }
+
     @Test
     public void testConcatenateMode(@TempDir Path tmp) throws Exception {
         // Test that CONCATENATE mode returns a single metadata object with 
content


Reply via email to