This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 62ecae86 fix(parquet): return error instead of panicking on
first-write failure (#824)
62ecae86 is described below
commit 62ecae86fea101501bf47ab9068443dc5a4319f8
Author: Matt Topol <[email protected]>
AuthorDate: Thu May 28 14:02:55 2026 -0400
fix(parquet): return error instead of panicking on first-write failure
(#824)
### Rationale for this change
Closes #820.
`(*file.Writer).startFile()` panics with the literal string `"failed to
write magic number"` when the underlying sink's first `Write` call
returns an error or short-writes. Because `file.NewParquetWriter` calls
`startFile` synchronously inside the constructor and has no `error`
return, callers had no idiomatic Go recovery path. The higher-level
`pqarrow.NewFileWriter` already returns `(*FileWriter, error)`, but the
panic propagated through it unchanged, so its `error` return was never
reached for this failure mode.
In production this manifests when the sink is a network-attached writer
(`*storage.Writer` from GCS, an S3 multipart upload, an HDFS client,
etc.). A transient network blip on the first 4-byte magic-header write
reliably crashes the worker process, orphans any in-flight upload
session, and forces a container restart.
### What changes are included in this PR?
1. **`(*Writer).startFile()` now returns `error`** instead of panicking
on sink-write failures. The configuration-validation panic for
"encrypted column not found in file schema" is preserved — that path is
a programmer error, not an I/O failure.
2. **New public constructor `file.NewParquetWriterWithError(w, sc,
opts...) (*Writer, error)`** is the preferred entry point for callers
whose sink may transiently fail.
3. **`file.NewParquetWriter` is now a thin wrapper** over
`NewParquetWriterWithError` that panics with the *exact same string*
(`"failed to write magic number"`) on init failure. Any consumer with a
`recover()` block that string-matches the panic value continues to work
unchanged.
4. **`pqarrow.NewFileWriter` switches to `NewParquetWriterWithError`**
internally. Its public signature is unchanged; its `error` return now
meaningfully covers transient sink failures, with no API change for any
of its callers.
### Are these changes tested?
Yes. Six new tests in `parquet/file/file_writer_test.go` exercise the
new behavior using a `flakyMagicSink` test helper modeled on the
existing `errCloseWriter` pattern in the same file:
- `TestNewParquetWriterWithError_Success` — happy path on a
`bytes.Buffer`.
- `TestNewParquetWriterWithError_FirstWriteFails` — first `Write`
returns `io.ErrUnexpectedEOF`; constructor returns a wrapped error and
does not panic.
- `TestNewParquetWriterWithError_FirstWriteShortWrites` — first `Write`
returns `n=len(p)-1, nil`; constructor returns an error mentioning the
short write.
- `TestNewParquetWriter_PreservesPanicMessage` — the legacy constructor
still panics, the panic value is still a `string`, and the message is
still the literal `"failed to write magic number"` (back-compat guard).
- `TestPqarrowNewFileWriter_PropagatesInitError` —
`pqarrow.NewFileWriter` returns the wrapped sink error rather than
panicking.
- `TestPqarrowNewFileWriter_PropagatesShortWrite` — same for the
short-write case.
The existing `TestCloseError`, full `parquet/file/...` and
`parquet/pqarrow/...` test suites all pass (with `PARQUET_TEST_DATA`
pointed at `parquet-testing/data`).
### Are there any user-facing changes?
**No breaking changes.**
- `file.NewParquetWriter(w, sc, opts...) *Writer` — signature unchanged,
still panics on first-write failure with the identical literal string
`"failed to write magic number"`. Existing `recover()` workarounds keep
working.
- `file.NewParquetWriterWithError(w, sc, opts...) (*Writer, error)` —
new exported symbol. Adding an exported function is non-breaking in Go.
- `pqarrow.NewFileWriter(...) (*FileWriter, error)` — signature
unchanged. Its `error` return now also covers the magic-header-write
failure that previously escaped as a panic. Callers who already check
`err` get strictly better behavior.
The new constructor's docstring is the recommended path for users on
cloud-storage upload sinks. The legacy constructor's docstring now
documents the panic behavior the original issue called out as
undocumented.
### Out of scope (separate follow-up)
While investigating, I noticed `WriteBatchSpaced` in
`parquet/file/column_writer_types.gen.go` lacks the `defer recover()`
that `WriteBatch` has, so an I/O error during a spaced write panics out
of the calling goroutine instead of returning an error. Same class of
bug as #820 but a different code path; happy to file a separate issue.
---
parquet/file/file_writer.go | 45 +++++++++++++++---
parquet/file/file_writer_test.go | 98 ++++++++++++++++++++++++++++++++++++++++
parquet/pqarrow/file_writer.go | 5 +-
3 files changed, 141 insertions(+), 7 deletions(-)
diff --git a/parquet/file/file_writer.go b/parquet/file/file_writer.go
index 5456e7f9..291f2335 100644
--- a/parquet/file/file_writer.go
+++ b/parquet/file/file_writer.go
@@ -65,11 +65,38 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata)
WriteOption {
}
}
-// NewParquetWriter returns a Writer that writes to the provided WriteSeeker
with the given schema.
+// NewParquetWriter returns a Writer that writes to the provided io.Writer
with the given schema.
//
// If props is nil, then the default Writer Properties will be used. If the
key value metadata is not nil,
// it will be added to the file.
+//
+// This constructor panics with the literal string "failed to write magic
+// number" if the initial write of the parquet magic header to the
+// underlying sink fails. The behavior is preserved for backward
+// compatibility with callers that string-match the panic value in a
+// recover() block; new code should prefer [NewParquetWriterWithError],
+// which returns the failure as an error instead.
func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption)
*Writer {
+ fw, err := NewParquetWriterWithError(w, sc, opts...)
+ if err != nil {
+ // Preserve the historical panic value verbatim so any consumer
+ // performing a string-match in a recover() block continues to
work.
+ panic("failed to write magic number")
+ }
+ return fw
+}
+
+// NewParquetWriterWithError returns a Writer that writes to the provided
+// io.Writer with the given schema.
+//
+// If props is nil, then the default Writer Properties will be used. If the
+// key value metadata is not nil, it will be added to the file.
+//
+// An error is returned if the initial write of the parquet magic header to
+// the underlying sink fails or short-writes, which can happen when the
+// sink is a flaky or network-attached writer (for example, a cloud-storage
+// upload writer).
+func NewParquetWriterWithError(w io.Writer, sc *schema.GroupNode, opts
...WriteOption) (*Writer, error) {
config := &writerConfig{}
for _, o := range opts {
o(config)
@@ -87,8 +114,10 @@ func NewParquetWriter(w io.Writer, sc *schema.GroupNode,
opts ...WriteOption) *W
}
fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props,
config.keyValueMetadata)
- fw.startFile()
- return fw
+ if err := fw.startFile(); err != nil {
+ return nil, err
+ }
+ return fw, nil
}
// NumColumns returns the number of columns to write as defined by the schema.
@@ -167,7 +196,7 @@ func (fw *Writer) appendRowGroup(buffered bool)
*rowGroupWriter {
return fw.rowGroupWriter
}
-func (fw *Writer) startFile() {
+func (fw *Writer) startFile() error {
encryptionProps := fw.props.FileEncryptionProperties()
magic := magicBytes
if encryptionProps != nil {
@@ -199,8 +228,11 @@ func (fw *Writer) startFile() {
}
n, err := fw.sink.Write(magic)
- if n != 4 || err != nil {
- panic("failed to write magic number")
+ if err != nil {
+ return fmt.Errorf("parquet: failed to write magic number: %w",
err)
+ }
+ if n != len(magic) {
+ return fmt.Errorf("parquet: short write of magic number: wrote
%d of %d bytes", n, len(magic))
}
if fw.props.PageIndexEnabled() {
@@ -209,6 +241,7 @@ func (fw *Writer) startFile() {
Encryptor: fw.fileEncryptor,
}
}
+ return nil
}
func (fw *Writer) writePageIndex() {
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index 7a6c6ae1..88d82d73 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -18,7 +18,9 @@ package file_test
import (
"bytes"
+ "errors"
"fmt"
+ "io"
"math"
"reflect"
"slices"
@@ -1324,3 +1326,99 @@ func TestBufferedStreamDictionaryCompressed(t
*testing.T) {
assert.Equal(t, int32(i), readValues[i])
}
}
+
+// flakyMagicSink models a transient I/O failure on the first Write call,
+// the failure mode reported in apache/arrow-go#820 for cloud-storage upload
+// writers. Set firstErr to fail with that error on the first Write, or set
+// short to true to return n=len(p)-1, nil on the first Write. All subsequent
+// writes succeed and forward to the embedded buffer.
+type flakyMagicSink struct {
+ buf bytes.Buffer
+ writes int
+ firstErr error
+ short bool
+}
+
+func (f *flakyMagicSink) Write(p []byte) (int, error) {
+ f.writes++
+ if f.writes == 1 {
+ if f.firstErr != nil {
+ return 0, f.firstErr
+ }
+ if f.short {
+ return len(p) - 1, nil
+ }
+ }
+ return f.buf.Write(p)
+}
+
+func newSingleColumnSchema(t *testing.T) *schema.GroupNode {
+ t.Helper()
+ fields := schema.FieldList{schema.NewInt32Node("col",
parquet.Repetitions.Required, 1)}
+ sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required,
fields, 0)
+ require.NoError(t, err)
+ return sc
+}
+
+func TestNewParquetWriterWithError_Success(t *testing.T) {
+ var buf bytes.Buffer
+ writer, err := file.NewParquetWriterWithError(&buf,
newSingleColumnSchema(t))
+ require.NoError(t, err)
+ require.NotNil(t, writer)
+ require.NoError(t, writer.Close())
+}
+
+func TestNewParquetWriterWithError_FirstWriteFails(t *testing.T) {
+ sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+ writer, err := file.NewParquetWriterWithError(sink,
newSingleColumnSchema(t))
+ require.Error(t, err)
+ require.Nil(t, writer)
+ require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
+ "expected returned error to wrap io.ErrUnexpectedEOF, got %v",
err)
+}
+
+func TestNewParquetWriterWithError_FirstWriteShortWrites(t *testing.T) {
+ sink := &flakyMagicSink{short: true}
+ writer, err := file.NewParquetWriterWithError(sink,
newSingleColumnSchema(t))
+ require.Error(t, err)
+ require.Nil(t, writer)
+ require.Contains(t, err.Error(), "short write of magic number")
+}
+
+func TestNewParquetWriter_PreservesPanicMessage(t *testing.T) {
+ sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+
+ defer func() {
+ r := recover()
+ require.NotNil(t, r, "NewParquetWriter should panic on
first-write failure")
+ msg, ok := r.(string)
+ require.True(t, ok, "panic value should remain a string for
back-compat (got %T)", r)
+ require.Equal(t, "failed to write magic number", msg)
+ }()
+
+ _ = file.NewParquetWriter(sink, newSingleColumnSchema(t))
+ t.Fatalf("expected NewParquetWriter to panic, but it returned normally")
+}
+
+func TestPqarrowNewFileWriter_PropagatesInitError(t *testing.T) {
+ arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type:
arrow.PrimitiveTypes.Int32}}, nil)
+ sink := &flakyMagicSink{firstErr: io.ErrUnexpectedEOF}
+
+ writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
+ parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
+ require.Error(t, err)
+ require.Nil(t, writer)
+ require.True(t, errors.Is(err, io.ErrUnexpectedEOF),
+ "expected returned error to wrap io.ErrUnexpectedEOF, got %v",
err)
+}
+
+func TestPqarrowNewFileWriter_PropagatesShortWrite(t *testing.T) {
+ arrowSchema := arrow.NewSchema([]arrow.Field{{Name: "f", Type:
arrow.PrimitiveTypes.Int32}}, nil)
+ sink := &flakyMagicSink{short: true}
+
+ writer, err := pqarrow.NewFileWriter(arrowSchema, sink,
+ parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
+ require.Error(t, err)
+ require.Nil(t, writer)
+ require.Contains(t, err.Error(), "short write of magic number")
+}
diff --git a/parquet/pqarrow/file_writer.go b/parquet/pqarrow/file_writer.go
index eb576e74..442ec8b4 100644
--- a/parquet/pqarrow/file_writer.go
+++ b/parquet/pqarrow/file_writer.go
@@ -163,7 +163,10 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer,
props *parquet.WriterPr
}
schemaNode := pqschema.Root()
- baseWriter := file.NewParquetWriter(w, schemaNode,
file.WithWriterProps(props), file.WithWriteMetadata(meta))
+ baseWriter, err := file.NewParquetWriterWithError(w, schemaNode,
file.WithWriterProps(props), file.WithWriteMetadata(meta))
+ if err != nil {
+ return nil, err
+ }
manifest, err := NewSchemaManifest(pqschema, nil,
&ArrowReadProperties{})
if err != nil {