This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b2aa6bc fix(parquet): align dictionary fallback with parquet-mr (#786)
2b2aa6bc is described below

commit 2b2aa6bcc6efe1b197a98ced4c81c4a0e125ffbd
Author: Tobias Pütz <[email protected]>
AuthorDate: Thu Apr 30 19:40:44 2026 +0200

    fix(parquet): align dictionary fallback with parquet-mr (#786)
    
    ### Rationale for this change
    
    On dictionary overflow, arrow-go always flushed the dictionary page and
    any buffered dict-encoded data pages before switching to PLAIN, even
    when no dict-encoded data page had been cut. On mid-cardinality columns
    the result was a 4-encoding chunk layout (PLAIN_DICTIONARY, PLAIN, RLE,
    PLAIN) that bloated output by 20-30% versus parquet-mr.
    
    This was noticed when testing iceberg-go's recently added compaction
    feature, where some tables with particular high cardinality columns
    would see a 30% size increase after compaction.
    
    ### What changes are included in this PR?
    
    Mirror parquet-mr's FallbackValuesWriter:
    
    - Discard the dictionary and re-encode buffered indices as PLAIN when no
    dict-encoded data page has been flushed yet; only emit the dictionary
    page once a dict-encoded page is committed.
    - Before the first dict-encoded page, fall back to PLAIN if dict +
    indices >= raw input bytes.
    - Size dict-encoded pages by raw input bytes (not the RLE indices'
    encoded size) so the page cadence matches PLAIN.
    
    Adds DictEncoder.FallBackTo / ObservedRawSize and exposes
    BinaryMemoTable.Value for the fallback translation.
    
    
    ### Are these changes tested?
    
    Yes, as part of the PR and also e2e while testing compaction in
    iceberg-go.
    
    ### Are there any user-facing changes?
    
    No public API changes, only observable thing should be the dropped
    double encoding.
---
 parquet/file/column_writer.go                      |  34 +-
 parquet/file/column_writer_test.go                 |  41 +-
 parquet/file/column_writer_types.gen.go            | 488 ++++++++++++++++++---
 parquet/file/column_writer_types.gen.go.tmpl       |  59 ++-
 parquet/file/dict_fallback_repro_test.go           | 372 ++++++++++++++++
 parquet/internal/encoding/byte_array_encoder.go    |  19 +
 parquet/internal/encoding/encoder.go               |  20 +
 .../encoding/fixed_len_byte_array_encoder.go       |  19 +
 parquet/internal/encoding/typed_encoder.go         |  37 ++
 parquet/internal/encoding/types.go                 |  14 +
 parquet/pqarrow/encode_dictionary_test.go          |  18 +-
 parquet/pqarrow/file_writer_test.go                |   8 +-
 12 files changed, 1003 insertions(+), 126 deletions(-)

diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go
index b9d2220c..e72c36d3 100644
--- a/parquet/file/column_writer.go
+++ b/parquet/file/column_writer.go
@@ -140,6 +140,7 @@ type columnWriter struct {
        totalCompressedBytes int64
        closed               bool
        fallbackToNonDict    bool
+       dictPageWritten      bool
 
        pages []DataPage
 
@@ -264,8 +265,20 @@ func (w *columnWriter) 
commitWriteAndCheckPageLimit(numLevels, numValues int64)
        w.numBufferedValues += numLevels
        w.numDataValues += numValues
 
-       enc := w.currentEncoder.EstimatedDataEncodedSize()
-       if enc >= w.props.DataPageSize() {
+       // While dictionary encoding is active we size pages by raw input bytes
+       // instead of the RLE-indices' encoded size. Mirrors parquet-mr's
+       // FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly
+       // the same raw-byte footprint as the PLAIN pages they'd otherwise be,
+       // which also pulls the first-page compression check into the same
+       // cadence parquet-mr uses and avoids committing dict pages that only
+       // look cheap because their RLE indices are tiny.
+       var bufferedSize int64
+       if w.hasDict && !w.fallbackToNonDict {
+               bufferedSize = 
w.currentEncoder.(encoding.DictEncoder).ObservedRawSize()
+       } else {
+               bufferedSize = w.currentEncoder.EstimatedDataEncodedSize()
+       }
+       if bufferedSize >= w.props.DataPageSize() {
                return w.FlushCurrentPage()
        }
        return nil
@@ -427,7 +440,14 @@ func (w *columnWriter) FlushBufferedDataPages() (err 
error) {
                        return err
                }
        }
+       return w.drainBufferedDataPages()
+}
 
+// drainBufferedDataPages writes out and releases any pages buffered while
+// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not
+// touch the current encoder's unflushed values, so the caller can re-encode
+// them as PLAIN during a dictionary fallback.
+func (w *columnWriter) drainBufferedDataPages() (err error) {
        for i, p := range w.pages {
                defer p.Release()
                if err = w.WriteDataPage(p); err != nil {
@@ -502,6 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error {
        page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), 
w.props.DictionaryPageEncoding())
        written, err := w.pager.WriteDictionaryPage(page)
        w.totalBytesWritten += written
+       w.dictPageWritten = err == nil
        return err
 }
 
@@ -620,7 +641,14 @@ func (w *columnWriter) Close() (err error) {
                if w.rowsWritten > 0 && chunkStats.IsSet() {
                        w.metaData.SetStats(chunkStats)
                }
-               err = w.pager.Close(w.hasDict, w.fallbackToNonDict)
+               // Only advertise PLAIN_DICTIONARY / fallback encodings in the 
column
+               // chunk's encoding list when a dictionary page was actually 
written.
+               // When fallback discards the dictionary before any 
dict-encoded page
+               // is flushed, the column contains only PLAIN data and the list 
should
+               // reflect that unambiguously.
+               advertiseDict := w.hasDict && w.dictPageWritten
+               advertiseFallback := w.fallbackToNonDict && w.dictPageWritten
+               err = w.pager.Close(advertiseDict, advertiseFallback)
        }
        return err
 }
diff --git a/parquet/file/column_writer_test.go 
b/parquet/file/column_writer_test.go
index 03dbd25e..65d5022f 100644
--- a/parquet/file/column_writer_test.go
+++ b/parquet/file/column_writer_test.go
@@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite) 
testDictionaryFallbackEncoding(version parque
        p.EqualValues(VeryLargeSize, valuesRead)
        p.Equal(p.Values, p.ValuesOut)
 
+       // With the parquet-mr-aligned fallback behavior, a dictionary that 
overflows
+       // before any dict-encoded data page has been flushed is discarded 
entirely.
+       // With the default page-size parameters used here the dict-encoded
+       // EstimatedDataEncodedSize (RLE indices) stays under DataPageSize 
right up
+       // to the overflow point, so no dict page and no dict-encoded data pages
+       // appear in the output — the column is pure PLAIN.
        encodings := p.metadataEncodings()
-       if p.Typ.Kind() == reflect.Bool || p.Typ == 
reflect.TypeOf(parquet.Int96{}) {
-               // dictionary encoding is not allowed for booleans
-               // there are 2 encodings (PLAIN, RLE) in a non dictionary 
encoding case
-               p.Equal([]parquet.Encoding{parquet.Encodings.Plain, 
parquet.Encodings.RLE}, encodings)
-       } else if version == parquet.V1_0 {
-               // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) 
in a fallback case
-               // for version 1.0
-               p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict, 
parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, 
encodings)
-       } else {
-               // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in 
a fallback case for
-               // version 2.0
-               p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict, 
parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, 
encodings)
-       }
+       p.Equal([]parquet.Encoding{parquet.Encodings.Plain, 
parquet.Encodings.RLE}, encodings)
 
        encodingStats := p.metadataEncodingStats()
-       if p.Typ.Kind() == reflect.Bool || p.Typ == 
reflect.TypeOf(parquet.Int96{}) {
-               p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding)
-               p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType)
-       } else if version == parquet.V1_0 {
-               expected := []metadata.PageEncodingStats{
-                       {Encoding: parquet.Encodings.PlainDict, PageType: 
format.PageType_DICTIONARY_PAGE},
-                       {Encoding: parquet.Encodings.Plain, PageType: 
format.PageType_DATA_PAGE},
-                       {Encoding: parquet.Encodings.PlainDict, PageType: 
format.PageType_DATA_PAGE}}
-               p.Equal(expected[0], encodingStats[0])
-               p.ElementsMatch(expected[1:], encodingStats[1:])
-       } else {
-               expected := []metadata.PageEncodingStats{
-                       {Encoding: parquet.Encodings.Plain, PageType: 
format.PageType_DICTIONARY_PAGE},
-                       {Encoding: parquet.Encodings.Plain, PageType: 
format.PageType_DATA_PAGE},
-                       {Encoding: parquet.Encodings.RLEDict, PageType: 
format.PageType_DATA_PAGE}}
-               p.Equal(expected[0], encodingStats[0])
-               p.ElementsMatch(expected[1:], encodingStats[1:])
+       p.NotEmpty(encodingStats)
+       for _, es := range encodingStats {
+               p.Equal(parquet.Encodings.Plain, es.Encoding)
+               p.Equal(format.PageType_DATA_PAGE, es.PageType)
        }
 }
 
diff --git a/parquet/file/column_writer_types.gen.go 
b/parquet/file/column_writer_types.gen.go
index e7f8c341..15a6d1e5 100644
--- a/parquet/file/column_writer_types.gen.go
+++ b/parquet/file/column_writer_types.gen.go
@@ -212,20 +212,63 @@ func (w *Int32ColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *Int32ColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.Int32Encoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // Int64ColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -410,20 +453,63 @@ func (w *Int64ColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *Int64ColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.Int64Encoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // Int96ColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -608,20 +694,63 @@ func (w *Int96ColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *Int96ColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.Int96Encoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // Float32ColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -806,20 +935,63 @@ func (w *Float32ColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *Float32ColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.Float32Encoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // Float64ColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -1004,20 +1176,63 @@ func (w *Float64ColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *Float64ColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.Float64Encoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // BooleanColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -1327,20 +1542,63 @@ func (w *BooleanColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *BooleanColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), 
false, w.descr, w.mem).(encoding.BooleanEncoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // ByteArrayColumnChunkWriter is the typed interface for writing columns to a 
parquet
@@ -1598,20 +1856,63 @@ func (w *ByteArrayColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *ByteArrayColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem).(encoding.ByteArrayEncoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // FixedLenByteArrayColumnChunkWriter is the typed interface for writing 
columns to a parquet
@@ -1877,20 +2178,63 @@ func (w *FixedLenByteArrayColumnChunkWriter) 
checkDictionarySizeLimit() {
                return
        }
 
-       if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
                w.FallbackToPlain()
+               return
        }
-}
 
+       // Before any dict-encoded data page has been cut, check whether the
+       // dictionary is actually saving space against a PLAIN baseline. Mirrors
+       // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+       // plus the encoded indices meet or exceed the raw input bytes, fall 
back
+       // to PLAIN now and discard the dictionary — avoiding the 
mid-cardinality
+       // case where a dict page stays in the file alongside PLAIN pages 
without
+       // any net compression win.
+       if !w.dictPageWritten && len(w.pages) == 0 {
+               rawSize := dictEnc.ObservedRawSize()
+               encodedSize := dictEnc.EstimatedDataEncodedSize()
+               dictSize := int64(dictEnc.DictEncodedSize())
+               // For an all-nulls batch rawSize, dictSize, and encodedSize 
are all 0,
+               // so this 0 >= 0 path triggers. Harmless: nothing is encoded 
yet, so
+               // PLAIN vs dict doesn't matter and we avoid an empty 
dictionary.
+               if dictSize+encodedSize >= rawSize {
+                       w.FallbackToPlain()
+               }
+       }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *FixedLenByteArrayColumnChunkWriter) FallbackToPlain() {
-       if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-               w.WriteDictionaryPage()
-               w.FlushBufferedDataPages()
-               w.fallbackToNonDict = true
-               w.currentEncoder.Release()
-               w.currentEncoder = 
encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem)
-               w.encoding = parquet.Encodings.Plain
+       if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+               return
        }
+
+       dictEnc := w.currentEncoder.(encoding.DictEncoder)
+       plainEnc := 
encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem).(encoding.FixedLenByteArrayEncoder)
+
+       if len(w.pages) > 0 {
+               if err := w.WriteDictionaryPage(); err != nil {
+                       panic(err)
+               }
+               if err := w.drainBufferedDataPages(); err != nil {
+                       panic(err)
+               }
+       }
+
+       if err := dictEnc.FallBackTo(plainEnc); err != nil {
+               panic(err)
+       }
+
+       dictEnc.Release()
+       w.currentEncoder = plainEnc
+       w.encoding = parquet.Encodings.Plain
+       w.fallbackToNonDict = true
 }
 
 // NewColumnChunkWriter constructs a column writer of the appropriate type by 
using the metadata builder
diff --git a/parquet/file/column_writer_types.gen.go.tmpl 
b/parquet/file/column_writer_types.gen.go.tmpl
index 9a22bb71..6cee1bad 100644
--- a/parquet/file/column_writer_types.gen.go.tmpl
+++ b/parquet/file/column_writer_types.gen.go.tmpl
@@ -481,20 +481,63 @@ func (w *{{.Name}}ColumnChunkWriter) 
checkDictionarySizeLimit() {
     return
   }
 
-  if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= 
int(w.props.DictionaryPageSizeLimit()) {
+  dictEnc := w.currentEncoder.(encoding.DictEncoder)
+  if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
     w.FallbackToPlain()
+    return
+  }
+
+  // Before any dict-encoded data page has been cut, check whether the
+  // dictionary is actually saving space against a PLAIN baseline. Mirrors
+  // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+  // plus the encoded indices meet or exceed the raw input bytes, fall back
+  // to PLAIN now and discard the dictionary — avoiding the mid-cardinality
+  // case where a dict page stays in the file alongside PLAIN pages without
+  // any net compression win.
+  if !w.dictPageWritten && len(w.pages) == 0 {
+    rawSize := dictEnc.ObservedRawSize()
+    encodedSize := dictEnc.EstimatedDataEncodedSize()
+    dictSize := int64(dictEnc.DictEncodedSize())
+    // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0,
+    // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so
+    // PLAIN vs dict doesn't matter and we avoid an empty dictionary.
+    if dictSize+encodedSize >= rawSize {
+      w.FallbackToPlain()
+    }
   }
 }
 
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
 func (w *{{.Name}}ColumnChunkWriter) FallbackToPlain() {
-  if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
-    w.WriteDictionaryPage()
-    w.FlushBufferedDataPages()
-    w.fallbackToNonDict = true
-    w.currentEncoder.Release()
-    w.currentEncoder = 
encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem)
-    w.encoding = parquet.Encodings.Plain
+  if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+    return
   }
+
+  dictEnc := w.currentEncoder.(encoding.DictEncoder)
+  plainEnc := 
encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
 false, w.descr, w.mem).(encoding.{{.Name}}Encoder)
+
+  if len(w.pages) > 0 {
+    if err := w.WriteDictionaryPage(); err != nil {
+      panic(err)
+    }
+    if err := w.drainBufferedDataPages(); err != nil {
+      panic(err)
+    }
+  }
+
+  if err := dictEnc.FallBackTo(plainEnc); err != nil {
+    panic(err)
+  }
+
+  dictEnc.Release()
+  w.currentEncoder = plainEnc
+  w.encoding = parquet.Encodings.Plain
+  w.fallbackToNonDict = true
 }
 {{end}}
 
diff --git a/parquet/file/dict_fallback_repro_test.go 
b/parquet/file/dict_fallback_repro_test.go
new file mode 100644
index 00000000..f82b3253
--- /dev/null
+++ b/parquet/file/dict_fallback_repro_test.go
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+       "bytes"
+       "fmt"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/compress"
+       "github.com/apache/arrow-go/v18/parquet/file"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+       "github.com/stretchr/testify/require"
+)
+
+// TestDictFallbackDiscardsOrphanDict verifies parquet-mr parity: when dict
+// encoding overflows before any dict-encoded data page has been flushed, the
+// dictionary is discarded and the buffered indices are re-encoded as PLAIN.
+// The resulting chunk must therefore contain zero dict bytes and match a
+// dict-disabled baseline to within the tiny delta from differences in how
+// rows are batched across pages.
+func TestDictFallbackDiscardsOrphanDict(t *testing.T) {
+       cases := []struct {
+               name    string
+               version parquet.Version
+               codec   compress.Compression
+       }{
+               {"v1/uncompressed", parquet.V1_0, compress.Codecs.Uncompressed},
+               {"v2/uncompressed", parquet.V2_LATEST, 
compress.Codecs.Uncompressed},
+               {"v1/snappy", parquet.V1_0, compress.Codecs.Snappy},
+               {"v2/snappy", parquet.V2_LATEST, compress.Codecs.Snappy},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       runDictFallbackNoDictPageCheck(t, tc.version, tc.codec)
+               })
+       }
+}
+
+func runDictFallbackNoDictPageCheck(t *testing.T, version parquet.Version, 
codec compress.Compression) {
+       const (
+               numValues         = 30000
+               valueWidth        = 32
+               dictPageSizeLimit = 256 * 1024
+               dataPageSize      = 128 * 1024
+       )
+
+       values := highCardinalityStrings(numValues, valueWidth)
+       knobs := writerKnobs{
+               dictPageSizeLimit: dictPageSizeLimit,
+               dataPageSize:      dataPageSize,
+               codec:             codec,
+       }
+
+       knobsOn := knobs
+       knobsOn.dictEnabled = true
+       _, dictOn := writeByteArrayColumn(t, values, version, knobsOn)
+
+       knobsOff := knobs
+       knobsOff.dictEnabled = false
+       _, dictOff := writeByteArrayColumn(t, values, version, knobsOff)
+
+       t.Logf("version=%s codec=%s numValues=%d valueWidth=%d (raw≈%d KiB)",
+               version, codec, numValues, valueWidth, 
numValues*valueWidth/1024)
+       t.Logf("dict=ON  chunk.compressed=%d  chunk.uncompressed=%d", 
dictOn.totalCompressed, dictOn.totalUncompressed)
+       t.Logf("dict=OFF chunk.compressed=%d  chunk.uncompressed=%d", 
dictOff.totalCompressed, dictOff.totalUncompressed)
+       t.Logf("dict=ON encodings: %v", dictOn.encodings)
+       t.Logf("dict=ON per-page breakdown:")
+       for _, p := range dictOn.pages {
+               t.Logf("  %-18s encoding=%-16s numValues=%-6d compressed=%d B",
+                       p.pageType, p.encoding, p.numValues, p.compressed)
+       }
+
+       require.Falsef(t, dictOn.hasDictionaryPage,
+               "dictionary page should have been discarded; chunk=%d", 
dictOn.totalCompressed)
+       require.Zero(t,
+               dictOn.countDataPagesByEncoding(parquet.Encodings.PlainDict)+
+                       
dictOn.countDataPagesByEncoding(parquet.Encodings.RLEDict),
+               "no dict-encoded data pages expected after discard; 
encodings=%v", dictOn.encodings)
+       require.NotContains(t, dictOn.encodings, parquet.Encodings.PlainDict,
+               "encoding list should not advertise PLAIN_DICTIONARY when no 
dict page exists")
+       require.NotContains(t, dictOn.encodings, parquet.Encodings.RLEDict,
+               "encoding list should not advertise RLE_DICTIONARY when no dict 
page exists")
+
+       // With the fix, dict=on should match dict=off to within the rounding 
from
+       // first-page size differences (FallbackToPlain re-seeds the plain 
encoder
+       // with all buffered values, producing one oversized first page).
+       diff := dictOn.totalCompressed - dictOff.totalCompressed
+       t.Logf("size delta dict=on vs dict=off: %d B", diff)
+       require.LessOrEqualf(t, absInt64(diff), int64(valueWidth*2048),
+               "post-fix: dict=on should closely match dict=off, got 
delta=%d", diff)
+}
+
+// TestDictFallbackKeepsDictWhenAlreadyFlushed covers the case where some
+// dict-encoded data pages have already been emitted before the overflow —
+// those must be preserved alongside the dict page, and the remainder of the
+// column must switch to PLAIN. Matches parquet-mr's 
"initialUsedAndHadDictionary"
+// path.
+func TestDictFallbackKeepsDictWhenAlreadyFlushed(t *testing.T) {
+       const (
+               valueWidth        = 32
+               dictPageSizeLimit = 8 * 1024
+               dataPageSize      = 4 * 1024
+       )
+
+       // Mixed-cardinality data: a low-card prefix big enough that several
+       // dict-encoded data pages are flushed (and pass the first-page
+       // compression check) before a high-card tail overflows the dict and
+       // triggers fallback. The committed dict pages must be kept and the
+       // dictionary page preserved.
+       lowCard := make([]parquet.ByteArray, 5000)
+       for i := range lowCard {
+               lowCard[i] = parquet.ByteArray(fmt.Sprintf("cat_%02d%*s", i%16, 
valueWidth-8, ""))
+       }
+       highCard := highCardinalityStrings(5000, valueWidth)
+       values := append(lowCard, highCard...)
+
+       _, chunk := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+               dictEnabled:       true,
+               dictPageSizeLimit: dictPageSizeLimit,
+               dataPageSize:      dataPageSize,
+               codec:             compress.Codecs.Uncompressed,
+       })
+
+       t.Logf("encodings: %v  hasDictPage=%v  totalCompressed=%d",
+               chunk.encodings, chunk.hasDictionaryPage, chunk.totalCompressed)
+
+       require.True(t, chunk.hasDictionaryPage,
+               "dict page must be kept when dict-encoded data pages were 
already flushed")
+
+       dictPages := 
chunk.countDataPagesByEncoding(parquet.Encodings.PlainDict) +
+               chunk.countDataPagesByEncoding(parquet.Encodings.RLEDict)
+       plainPages := chunk.countDataPagesByEncoding(parquet.Encodings.Plain)
+       require.Greater(t, dictPages, 0, "expected dict-encoded data pages to 
survive fallback")
+       require.Greater(t, plainPages, 0, "expected PLAIN data pages after 
fallback")
+
+       // All dict-encoded data pages must come before any PLAIN page — 
otherwise
+       // the dictionary offset in the file footer would reference data pages
+       // that can't be decoded against it.
+       sawPlain := false
+       for _, p := range chunk.pages {
+               if p.pageType != format.PageType_DATA_PAGE && p.pageType != 
format.PageType_DATA_PAGE_V2 {
+                       continue
+               }
+               switch p.encoding {
+               case parquet.Encodings.Plain:
+                       sawPlain = true
+               case parquet.Encodings.PlainDict, parquet.Encodings.RLEDict:
+                       require.False(t, sawPlain,
+                               "dict-encoded data page appeared after a PLAIN 
page — wrong ordering")
+               }
+       }
+}
+
+func absInt64(v int64) int64 {
+       if v < 0 {
+               return -v
+       }
+       return v
+}
+
+// TestDictFallbackMidCardinality exercises the case the iceberg-go TPC-DS
+// benchmark flagged: mid-cardinality columns (ss_list_price etc.) where the
+// dictionary grows slowly enough to pass the first-page compression check,
+// eventually overflows, and leaves a dict page + dict-encoded pages + PLAIN
+// pages stranded in one chunk. With dict-mode pages sized by raw bytes
+// (matching parquet-mr), overflow happens at a similar cadence to plain
+// encoding, so dict=on either stays competitive with dict=off or the
+// committed dict pages are genuinely earning their keep.
+func TestDictFallbackMidCardinality(t *testing.T) {
+       const (
+               numRows            = 200000
+               valueWidth         = 8
+               distinctValueCount = 20000
+               dictPageSizeLimit  = 128 * 1024
+               dataPageSize       = 128 * 1024
+       )
+
+       values := midCardinalityStrings(numRows, distinctValueCount, valueWidth)
+
+       _, dictOn := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+               dictEnabled:       true,
+               dictPageSizeLimit: dictPageSizeLimit,
+               dataPageSize:      dataPageSize,
+               codec:             compress.Codecs.Snappy,
+       })
+       _, dictOff := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+               dictEnabled:  false,
+               dataPageSize: dataPageSize,
+               codec:        compress.Codecs.Snappy,
+       })
+
+       t.Logf("mid-card: distinct=%d rows=%d width=%d", distinctValueCount, 
numRows, valueWidth)
+       t.Logf("dict=ON  compressed=%d encodings=%v", dictOn.totalCompressed, 
dictOn.encodings)
+       t.Logf("dict=OFF compressed=%d encodings=%v", dictOff.totalCompressed, 
dictOff.encodings)
+
+       // The specific assertion: dict=on must not regress against dict=off by
+       // more than a small constant. Before the raw-byte page cadence, this
+       // scenario produced the 4-entry encoding layout and 20-30% bloat.
+       require.LessOrEqualf(t,
+               dictOn.totalCompressed, 
dictOff.totalCompressed+int64(numRows)/10,
+               "dict=on (%d) must not balloon against dict=off (%d) on 
mid-card data",
+               dictOn.totalCompressed, dictOff.totalCompressed)
+}
+
+func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray {
+       pool := make([]parquet.ByteArray, distinct)
+       for i := range pool {
+               b := make([]byte, width)
+               for j := range width {
+                       b[j] = byte('A' + (i+j*7)%26)
+               }
+               tail := fmt.Appendf(nil, "-%05d", i)
+               copy(b[width-len(tail):], tail)
+               pool[i] = parquet.ByteArray(b)
+       }
+       out := make([]parquet.ByteArray, n)
+       for i := range out {
+               // Deterministic, roughly uniform distribution across the pool.
+               out[i] = pool[int64(i)*2654435761%int64(distinct)]
+       }
+       return out
+}
+
+type writerKnobs struct {
+       dictEnabled       bool
+       dictPageSizeLimit int64
+       dataPageSize      int64
+       codec             compress.Compression
+}
+
+type pageInfo struct {
+       pageType   format.PageType
+       encoding   parquet.Encoding
+       numValues  int32
+       compressed int
+}
+
+type chunkSummary struct {
+       totalCompressed   int64
+       totalUncompressed int64
+       encodings         []parquet.Encoding
+       encodingStats     []pageEncodingStatsEntry
+       pages             []pageInfo
+       hasDictionaryPage bool
+}
+
+type pageEncodingStatsEntry struct {
+       PageType format.PageType
+       Encoding parquet.Encoding
+}
+
+func (c *chunkSummary) countDataPagesByEncoding(enc parquet.Encoding) int {
+       n := 0
+       for _, p := range c.pages {
+               if p.encoding == enc && (p.pageType == 
format.PageType_DATA_PAGE || p.pageType == format.PageType_DATA_PAGE_V2) {
+                       n++
+               }
+       }
+       return n
+}
+
+func writeByteArrayColumn(t *testing.T, values []parquet.ByteArray, version 
parquet.Version, knobs writerKnobs) ([]byte, *chunkSummary) {
+       t.Helper()
+
+       root := mustByteArraySchema(t)
+       opts := []parquet.WriterProperty{
+               parquet.WithVersion(version),
+               parquet.WithDictionaryDefault(knobs.dictEnabled),
+               parquet.WithCompression(knobs.codec),
+       }
+       if knobs.dictPageSizeLimit > 0 {
+               opts = append(opts, 
parquet.WithDictionaryPageSizeLimit(knobs.dictPageSizeLimit))
+       }
+       if knobs.dataPageSize > 0 {
+               opts = append(opts, 
parquet.WithDataPageSize(knobs.dataPageSize))
+       }
+       if version == parquet.V2_LATEST {
+               opts = append(opts, 
parquet.WithDataPageVersion(parquet.DataPageV2))
+       }
+       props := parquet.NewWriterProperties(opts...)
+
+       var buf bytes.Buffer
+       w := file.NewParquetWriter(&buf, root, file.WithWriterProps(props))
+       rgw := w.AppendRowGroup()
+       cw, err := rgw.NextColumn()
+       require.NoError(t, err)
+       _, err = cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(values, nil, 
nil)
+       require.NoError(t, err)
+       require.NoError(t, cw.Close())
+       require.NoError(t, rgw.Close())
+       require.NoError(t, w.Close())
+
+       return buf.Bytes(), summarizeFirstColumnChunk(t, buf.Bytes())
+}
+
+func summarizeFirstColumnChunk(t *testing.T, raw []byte) *chunkSummary {
+       t.Helper()
+
+       r, err := file.NewParquetReader(bytes.NewReader(raw))
+       require.NoError(t, err)
+       defer r.Close()
+
+       md, err := r.MetaData().RowGroup(0).ColumnChunk(0)
+       require.NoError(t, err)
+       s := &chunkSummary{
+               totalCompressed:   md.TotalCompressedSize(),
+               totalUncompressed: md.TotalUncompressedSize(),
+               encodings:         md.Encodings(),
+               hasDictionaryPage: md.HasDictionaryPage(),
+       }
+       for _, es := range md.EncodingStats() {
+               s.encodingStats = append(s.encodingStats, 
pageEncodingStatsEntry{
+                       PageType: es.PageType,
+                       Encoding: es.Encoding,
+               })
+       }
+
+       rg := r.RowGroup(0)
+       pr, err := rg.GetColumnPageReader(0)
+       require.NoError(t, err)
+       for pr.Next() {
+               page := pr.Page()
+               s.pages = append(s.pages, pageInfo{
+                       pageType:   format.PageType(page.Type()),
+                       encoding:   parquet.Encoding(page.Encoding()),
+                       numValues:  page.NumValues(),
+                       compressed: len(page.Data()),
+               })
+       }
+       require.NoError(t, pr.Err())
+       return s
+}
+
+func mustByteArraySchema(t *testing.T) *schema.GroupNode {
+       t.Helper()
+       root, err := schema.NewGroupNode("schema", 
parquet.Repetitions.Required, schema.FieldList{
+               schema.NewByteArrayNode("v", parquet.Repetitions.Required, -1),
+       }, -1)
+       require.NoError(t, err)
+       return root
+}
+
+func highCardinalityStrings(n, width int) []parquet.ByteArray {
+       out := make([]parquet.ByteArray, n)
+       for i := range out {
+               b := make([]byte, width)
+               for j := range width {
+                       b[j] = byte('A' + (i+j*31)%26)
+               }
+               tail := fmt.Appendf(nil, "-%07d", i)
+               copy(b[width-len(tail):], tail)
+               out[i] = parquet.ByteArray(b)
+       }
+       return out
+}
diff --git a/parquet/internal/encoding/byte_array_encoder.go 
b/parquet/internal/encoding/byte_array_encoder.go
index d0439faf..397a18a7 100644
--- a/parquet/internal/encoding/byte_array_encoder.go
+++ b/parquet/internal/encoding/byte_array_encoder.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/internal/bitutils"
+       "github.com/apache/arrow-go/v18/internal/hashing"
        "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
 )
@@ -103,6 +104,7 @@ func (enc *DictByteArrayEncoder) PutByteArray(in 
parquet.ByteArray) {
                enc.dictEncodedSize += in.Len() + arrow.Uint32SizeBytes
        }
        enc.addIndex(memoIdx)
+       enc.AddRawSize(int64(in.Len() + arrow.Uint32SizeBytes))
 }
 
 // Put takes a slice of ByteArrays to add and encode.
@@ -127,6 +129,23 @@ func (enc *DictByteArrayEncoder) NormalizeDict(values 
arrow.Array) (arrow.Array,
        return values, nil
 }
 
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictByteArrayEncoder) FallBackTo(fallback TypedEncoder) error {
+       target, ok := fallback.(ByteArrayEncoder)
+       if !ok {
+               return fmt.Errorf("parquet: dict fallback target encoder has 
wrong element type")
+       }
+       bm := enc.memo.(*hashing.BinaryMemoTable)
+       vals := make([]parquet.ByteArray, len(enc.idxValues))
+       for i, idx := range enc.idxValues {
+               vals[i] = parquet.ByteArray(bm.Value(int(idx)))
+       }
+       target.Put(vals)
+       enc.idxValues = enc.idxValues[:0]
+       return nil
+}
+
 // PutDictionary allows pre-seeding a dictionary encoder with
 // a dictionary from an Arrow Array.
 //
diff --git a/parquet/internal/encoding/encoder.go 
b/parquet/internal/encoding/encoder.go
index 2f1d698f..64a992e7 100644
--- a/parquet/internal/encoding/encoder.go
+++ b/parquet/internal/encoding/encoder.go
@@ -114,6 +114,13 @@ type dictEncoder struct {
        idxValues       []int32
        memo            MemoTable
 
+       // rawDataSize is the number of bytes of input values observed since
+       // the last page flush. Mirrors parquet-mr's rawDataByteSize and is
+       // consulted by FlushCurrentPage to decide whether the dictionary is
+       // actually compressing (dict + indices < raw) before committing the
+       // first dict-encoded data page.
+       rawDataSize int64
+
        preservedDict arrow.Array
 }
 
@@ -134,6 +141,7 @@ func (d *dictEncoder) Reset() {
        d.dictEncodedSize = 0
        d.idxValues = d.idxValues[:0]
        d.idxBuffer.ResizeNoShrink(0)
+       d.rawDataSize = 0
        d.memo.Reset()
        if d.preservedDict != nil {
                d.preservedDict.Release()
@@ -141,6 +149,17 @@ func (d *dictEncoder) Reset() {
        }
 }
 
+// ObservedRawSize returns the number of raw input bytes accumulated since
+// the last data page flush. Used with DictEncodedSize and
+// EstimatedDataEncodedSize to evaluate whether dictionary encoding is
+// actually saving space on the first page.
+func (d *dictEncoder) ObservedRawSize() int64 { return d.rawDataSize }
+
+// AddRawSize is used by per-type dict encoders to accumulate the raw-bytes
+// total as values are written. Exported at package scope because the typed
+// encoders live in different files within this package.
+func (d *dictEncoder) AddRawSize(n int64) { d.rawDataSize += n }
+
 func (d *dictEncoder) Release() {
        d.encoder.Release()
        d.idxBuffer.Release()
@@ -285,6 +304,7 @@ func (d *dictEncoder) WriteIndices(out []byte) (int, error) 
{
        nbytes := enc.Flush()
 
        d.idxValues = d.idxValues[:0]
+       d.rawDataSize = 0
        return nbytes + 1, nil
 }
 
diff --git a/parquet/internal/encoding/fixed_len_byte_array_encoder.go 
b/parquet/internal/encoding/fixed_len_byte_array_encoder.go
index 0f4345d2..854b8b8a 100644
--- a/parquet/internal/encoding/fixed_len_byte_array_encoder.go
+++ b/parquet/internal/encoding/fixed_len_byte_array_encoder.go
@@ -21,6 +21,7 @@ import (
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/internal/bitutils"
+       "github.com/apache/arrow-go/v18/internal/hashing"
        "github.com/apache/arrow-go/v18/parquet"
 )
 
@@ -136,6 +137,7 @@ func (enc *DictFixedLenByteArrayEncoder) Put(in 
[]parquet.FixedLenByteArray) {
                }
                enc.addIndex(memoIdx)
        }
+       enc.AddRawSize(int64(len(in)) * int64(enc.typeLen))
 }
 
 // PutSpaced is like Put but leaves space for nulls
@@ -151,6 +153,23 @@ func (enc *DictFixedLenByteArrayEncoder) 
NormalizeDict(values arrow.Array) (arro
        return values, nil
 }
 
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictFixedLenByteArrayEncoder) FallBackTo(fallback TypedEncoder) 
error {
+       target, ok := fallback.(FixedLenByteArrayEncoder)
+       if !ok {
+               return fmt.Errorf("parquet: dict fallback target encoder has 
wrong element type")
+       }
+       bm := enc.memo.(*hashing.BinaryMemoTable)
+       vals := make([]parquet.FixedLenByteArray, len(enc.idxValues))
+       for i, idx := range enc.idxValues {
+               vals[i] = parquet.FixedLenByteArray(bm.Value(int(idx)))
+       }
+       target.Put(vals)
+       enc.idxValues = enc.idxValues[:0]
+       return nil
+}
+
 // PutDictionary allows pre-seeding a dictionary encoder with
 // a dictionary from an Arrow Array.
 //
diff --git a/parquet/internal/encoding/typed_encoder.go 
b/parquet/internal/encoding/typed_encoder.go
index 1cba18da..82bb6fc9 100644
--- a/parquet/internal/encoding/typed_encoder.go
+++ b/parquet/internal/encoding/typed_encoder.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/compute"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/internal/bitutils"
+       "github.com/apache/arrow-go/v18/internal/hashing"
        shared_utils "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
        format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
@@ -144,6 +145,7 @@ func (enc *typedDictEncoder[T]) Put(in []T) {
        for _, val := range in {
                enc.dictEncoder.Put(val)
        }
+       enc.AddRawSize(int64(len(in)) * int64(unsafe.Sizeof(T(0))))
 }
 
 func (enc *typedDictEncoder[T]) PutSpaced(in []T, validBits []byte, 
validBitsOffset int64) {
@@ -158,6 +160,22 @@ type arrvalues[T arrow.ValueType] interface {
        Values() []T
 }
 
+func (enc *typedDictEncoder[T]) FallBackTo(fallback TypedEncoder) error {
+       target, ok := fallback.(Encoder[T])
+       if !ok {
+               return fmt.Errorf("parquet: dict fallback target encoder has 
wrong element type")
+       }
+       dict := make([]T, enc.memo.Size())
+       enc.memo.CopyValues(dict)
+       vals := make([]T, len(enc.idxValues))
+       for i, idx := range enc.idxValues {
+               vals[i] = dict[idx]
+       }
+       target.Put(vals)
+       enc.idxValues = enc.idxValues[:0]
+       return nil
+}
+
 func (enc *typedDictEncoder[T]) NormalizeDict(values arrow.Array) 
(arrow.Array, error) {
        if _, ok := values.(arrvalues[T]); ok {
                values.Retain()
@@ -427,6 +445,7 @@ func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
                }
                enc.addIndex(memoIdx)
        }
+       enc.AddRawSize(int64(len(in)) * int64(parquet.Int96SizeBytes))
 }
 
 // PutSpaced is like Put but assumes space for nulls
@@ -446,6 +465,24 @@ func (enc *DictInt96Encoder) PutDictionary(arrow.Array) 
error {
        return fmt.Errorf("%w: direct PutDictionary to Int96", 
arrow.ErrNotImplemented)
 }
 
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictInt96Encoder) FallBackTo(fallback TypedEncoder) error {
+       target, ok := fallback.(Int96Encoder)
+       if !ok {
+               return fmt.Errorf("parquet: dict fallback target encoder has 
wrong element type")
+       }
+       bm := enc.memo.(*hashing.BinaryMemoTable)
+       vals := make([]parquet.Int96, len(enc.idxValues))
+       for i, idx := range enc.idxValues {
+               v := bm.Value(int(idx))
+               copy(vals[i][:], v)
+       }
+       target.Put(vals)
+       enc.idxValues = enc.idxValues[:0]
+       return nil
+}
+
 // the boolEncoderTraits struct is used to make it easy to create encoders and 
decoders based on type
 type boolEncoderTraits struct{}
 
diff --git a/parquet/internal/encoding/types.go 
b/parquet/internal/encoding/types.go
index a2f1a5b2..7c4015df 100644
--- a/parquet/internal/encoding/types.go
+++ b/parquet/internal/encoding/types.go
@@ -119,6 +119,20 @@ type DictEncoder interface {
        //
        // The returned array must always be released by the caller.
        NormalizeDict(arrow.Array) (arrow.Array, error)
+       // FallBackTo translates the buffered indices back through the 
dictionary
+       // and puts the raw values into the fallback encoder, clearing the dict
+       // encoder's index buffer. Mirrors parquet-mr's
+       // RequiresFallback.fallBackAllValuesTo: used by column writers when the
+       // dictionary overflows mid-chunk so already-buffered values can be
+       // re-encoded with the fallback (PLAIN) encoder instead of being emitted
+       // as a stranded dict-encoded page.
+       FallBackTo(fallback TypedEncoder) error
+       // ObservedRawSize returns the raw input byte count accumulated since
+       // the last page flush. Used alongside DictEncodedSize and
+       // EstimatedDataEncodedSize to decide whether dictionary encoding is
+       // actually saving space before committing the first dict data page —
+       // mirrors parquet-mr's rawDataByteSize.
+       ObservedRawSize() int64
 }
 
 var bufferPool = sync.Pool{
diff --git a/parquet/pqarrow/encode_dictionary_test.go 
b/parquet/pqarrow/encode_dictionary_test.go
index 79d3147d..e92b2587 100644
--- a/parquet/pqarrow/encode_dictionary_test.go
+++ b/parquet/pqarrow/encode_dictionary_test.go
@@ -188,7 +188,6 @@ func (ad *ArrowWriteDictionarySuite) 
TestStatisticsWithFallback() {
                {0, 1},
                {0, 0},
                {3}}
-       expectedDictCounts := []int32{4, 4, 4, 3}
        // pairs of (min, max)
        expectedMinMax := [][2]string{
                {"a", "b"},
@@ -266,22 +265,23 @@ func (ad *ArrowWriteDictionarySuite) 
TestStatisticsWithFallback() {
                        for rowGroup := 0; rowGroup < 2; rowGroup++ {
                                pr, err := 
rdr.RowGroup(0).GetColumnPageReader(0)
                                ad.Require().NoError(err)
-                               ad.True(pr.Next())
-                               page := pr.Page()
-                               ad.NotNil(page)
-                               ad.NoError(pr.Err())
-                               
ad.Require().IsType((*file.DictionaryPage)(nil), page)
-                               dictPage := page.(*file.DictionaryPage)
-                               ad.EqualValues(expectedDictCounts[caseIndex], 
dictPage.NumValues())
+
+                               // pqarrow falls back to PLAIN whenever the 
arrow dict has
+                               // duplicates, and does so before any 
dict-encoded data page
+                               // is flushed. Matching parquet-mr, arrow-go 
now discards the
+                               // dictionary in that case — so no DICTIONARY 
page appears and
+                               // all data pages are PLAIN.
 
                                for pageIdx := 0; pageIdx < 
expectedNumDataPages[caseIndex]; pageIdx++ {
                                        ad.True(pr.Next())
-                                       page = pr.Page()
+                                       page := pr.Page()
                                        ad.NotNil(page)
                                        ad.NoError(pr.Err())
 
                                        dataPage, ok := page.(file.DataPage)
                                        ad.Require().True(ok)
+                                       ad.EqualValues(parquet.Encodings.Plain, 
dataPage.Encoding(),
+                                               "page %d should be PLAIN after 
dict fallback", pageIdx)
                                        stats := dataPage.Statistics()
                                        
ad.EqualValues(expectedNullByPage[caseIndex][pageIdx], stats.NullCount)
 
diff --git a/parquet/pqarrow/file_writer_test.go 
b/parquet/pqarrow/file_writer_test.go
index 0c771931..7b98b212 100644
--- a/parquet/pqarrow/file_writer_test.go
+++ b/parquet/pqarrow/file_writer_test.go
@@ -171,8 +171,8 @@ func TestFileWriterTotalBytes(t *testing.T) {
        require.NoError(t, writer.Close())
 
        // Verify total bytes & compressed bytes are correct
-       assert.Equal(t, int64(408), writer.TotalCompressedBytes())
-       assert.Equal(t, int64(910), writer.TotalBytesWritten())
+       assert.Equal(t, int64(340), writer.TotalCompressedBytes())
+       assert.Equal(t, int64(799), writer.TotalBytesWritten())
 }
 
 func TestFileWriterTotalBytesBuffered(t *testing.T) {
@@ -205,8 +205,8 @@ func TestFileWriterTotalBytesBuffered(t *testing.T) {
        require.NoError(t, writer.Close())
 
        // Verify total bytes & compressed bytes are correct
-       assert.Equal(t, int64(596), writer.TotalCompressedBytes())
-       assert.Equal(t, int64(1306), writer.TotalBytesWritten())
+       assert.Equal(t, int64(494), writer.TotalCompressedBytes())
+       assert.Equal(t, int64(1139), writer.TotalBytesWritten())
 }
 
 func TestWriteOnClosedFileWriter(t *testing.T) {

Reply via email to