This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2b2aa6bc fix(parquet): align dictionary fallback with parquet-mr (#786)
2b2aa6bc is described below
commit 2b2aa6bcc6efe1b197a98ced4c81c4a0e125ffbd
Author: Tobias Pütz <[email protected]>
AuthorDate: Thu Apr 30 19:40:44 2026 +0200
fix(parquet): align dictionary fallback with parquet-mr (#786)
### Rationale for this change
On dictionary overflow, arrow-go always flushed the dictionary page and
any buffered dict-encoded data pages before switching to PLAIN, even
when no dict-encoded data page had been cut. On mid-cardinality columns
the result was a 4-encoding chunk layout (PLAIN_DICTIONARY, PLAIN, RLE,
PLAIN) that bloated output by 20-30% versus parquet-mr.
This was noticed when testing iceberg-go's recently added compaction
feature, where some tables with particular high cardinality columns
would see a 30% size increase after compaction.
### What changes are included in this PR?
Mirror parquet-mr's FallbackValuesWriter:
- Discard the dictionary and re-encode buffered indices as PLAIN when no
dict-encoded data page has been flushed yet; only emit the dictionary
page once a dict-encoded page is committed.
- Before the first dict-encoded page, fall back to PLAIN if dict +
indices >= raw input bytes.
- Size dict-encoded pages by raw input bytes (not the RLE indices'
encoded size) so the page cadence matches PLAIN.
Adds DictEncoder.FallBackTo / ObservedRawSize and exposes
BinaryMemoTable.Value for the fallback translation.
### Are these changes tested?
Yes, as part of the PR and also e2e while testing compaction in
iceberg-go.
### Are there any user-facing changes?
No public API changes, only observable thing should be the dropped
double encoding.
---
parquet/file/column_writer.go | 34 +-
parquet/file/column_writer_test.go | 41 +-
parquet/file/column_writer_types.gen.go | 488 ++++++++++++++++++---
parquet/file/column_writer_types.gen.go.tmpl | 59 ++-
parquet/file/dict_fallback_repro_test.go | 372 ++++++++++++++++
parquet/internal/encoding/byte_array_encoder.go | 19 +
parquet/internal/encoding/encoder.go | 20 +
.../encoding/fixed_len_byte_array_encoder.go | 19 +
parquet/internal/encoding/typed_encoder.go | 37 ++
parquet/internal/encoding/types.go | 14 +
parquet/pqarrow/encode_dictionary_test.go | 18 +-
parquet/pqarrow/file_writer_test.go | 8 +-
12 files changed, 1003 insertions(+), 126 deletions(-)
diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go
index b9d2220c..e72c36d3 100644
--- a/parquet/file/column_writer.go
+++ b/parquet/file/column_writer.go
@@ -140,6 +140,7 @@ type columnWriter struct {
totalCompressedBytes int64
closed bool
fallbackToNonDict bool
+ dictPageWritten bool
pages []DataPage
@@ -264,8 +265,20 @@ func (w *columnWriter)
commitWriteAndCheckPageLimit(numLevels, numValues int64)
w.numBufferedValues += numLevels
w.numDataValues += numValues
- enc := w.currentEncoder.EstimatedDataEncodedSize()
- if enc >= w.props.DataPageSize() {
+ // While dictionary encoding is active we size pages by raw input bytes
+ // instead of the RLE-indices' encoded size. Mirrors parquet-mr's
+ // FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly
+ // the same raw-byte footprint as the PLAIN pages they'd otherwise be,
+ // which also pulls the first-page compression check into the same
+ // cadence parquet-mr uses and avoids committing dict pages that only
+ // look cheap because their RLE indices are tiny.
+ var bufferedSize int64
+ if w.hasDict && !w.fallbackToNonDict {
+ bufferedSize =
w.currentEncoder.(encoding.DictEncoder).ObservedRawSize()
+ } else {
+ bufferedSize = w.currentEncoder.EstimatedDataEncodedSize()
+ }
+ if bufferedSize >= w.props.DataPageSize() {
return w.FlushCurrentPage()
}
return nil
@@ -427,7 +440,14 @@ func (w *columnWriter) FlushBufferedDataPages() (err
error) {
return err
}
}
+ return w.drainBufferedDataPages()
+}
+// drainBufferedDataPages writes out and releases any pages buffered while
+// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not
+// touch the current encoder's unflushed values, so the caller can re-encode
+// them as PLAIN during a dictionary fallback.
+func (w *columnWriter) drainBufferedDataPages() (err error) {
for i, p := range w.pages {
defer p.Release()
if err = w.WriteDataPage(p); err != nil {
@@ -502,6 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error {
page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()),
w.props.DictionaryPageEncoding())
written, err := w.pager.WriteDictionaryPage(page)
w.totalBytesWritten += written
+ w.dictPageWritten = err == nil
return err
}
@@ -620,7 +641,14 @@ func (w *columnWriter) Close() (err error) {
if w.rowsWritten > 0 && chunkStats.IsSet() {
w.metaData.SetStats(chunkStats)
}
- err = w.pager.Close(w.hasDict, w.fallbackToNonDict)
+ // Only advertise PLAIN_DICTIONARY / fallback encodings in the
column
+ // chunk's encoding list when a dictionary page was actually
written.
+ // When fallback discards the dictionary before any
dict-encoded page
+ // is flushed, the column contains only PLAIN data and the list
should
+ // reflect that unambiguously.
+ advertiseDict := w.hasDict && w.dictPageWritten
+ advertiseFallback := w.fallbackToNonDict && w.dictPageWritten
+ err = w.pager.Close(advertiseDict, advertiseFallback)
}
return err
}
diff --git a/parquet/file/column_writer_test.go
b/parquet/file/column_writer_test.go
index 03dbd25e..65d5022f 100644
--- a/parquet/file/column_writer_test.go
+++ b/parquet/file/column_writer_test.go
@@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite)
testDictionaryFallbackEncoding(version parque
p.EqualValues(VeryLargeSize, valuesRead)
p.Equal(p.Values, p.ValuesOut)
+ // With the parquet-mr-aligned fallback behavior, a dictionary that
overflows
+ // before any dict-encoded data page has been flushed is discarded
entirely.
+ // With the default page-size parameters used here the dict-encoded
+ // EstimatedDataEncodedSize (RLE indices) stays under DataPageSize
right up
+ // to the overflow point, so no dict page and no dict-encoded data pages
+ // appear in the output — the column is pure PLAIN.
encodings := p.metadataEncodings()
- if p.Typ.Kind() == reflect.Bool || p.Typ ==
reflect.TypeOf(parquet.Int96{}) {
- // dictionary encoding is not allowed for booleans
- // there are 2 encodings (PLAIN, RLE) in a non dictionary
encoding case
- p.Equal([]parquet.Encoding{parquet.Encodings.Plain,
parquet.Encodings.RLE}, encodings)
- } else if version == parquet.V1_0 {
- // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN)
in a fallback case
- // for version 1.0
- p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict,
parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain},
encodings)
- } else {
- // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in
a fallback case for
- // version 2.0
- p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict,
parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain},
encodings)
- }
+ p.Equal([]parquet.Encoding{parquet.Encodings.Plain,
parquet.Encodings.RLE}, encodings)
encodingStats := p.metadataEncodingStats()
- if p.Typ.Kind() == reflect.Bool || p.Typ ==
reflect.TypeOf(parquet.Int96{}) {
- p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding)
- p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType)
- } else if version == parquet.V1_0 {
- expected := []metadata.PageEncodingStats{
- {Encoding: parquet.Encodings.PlainDict, PageType:
format.PageType_DICTIONARY_PAGE},
- {Encoding: parquet.Encodings.Plain, PageType:
format.PageType_DATA_PAGE},
- {Encoding: parquet.Encodings.PlainDict, PageType:
format.PageType_DATA_PAGE}}
- p.Equal(expected[0], encodingStats[0])
- p.ElementsMatch(expected[1:], encodingStats[1:])
- } else {
- expected := []metadata.PageEncodingStats{
- {Encoding: parquet.Encodings.Plain, PageType:
format.PageType_DICTIONARY_PAGE},
- {Encoding: parquet.Encodings.Plain, PageType:
format.PageType_DATA_PAGE},
- {Encoding: parquet.Encodings.RLEDict, PageType:
format.PageType_DATA_PAGE}}
- p.Equal(expected[0], encodingStats[0])
- p.ElementsMatch(expected[1:], encodingStats[1:])
+ p.NotEmpty(encodingStats)
+ for _, es := range encodingStats {
+ p.Equal(parquet.Encodings.Plain, es.Encoding)
+ p.Equal(format.PageType_DATA_PAGE, es.PageType)
}
}
diff --git a/parquet/file/column_writer_types.gen.go
b/parquet/file/column_writer_types.gen.go
index e7f8c341..15a6d1e5 100644
--- a/parquet/file/column_writer_types.gen.go
+++ b/parquet/file/column_writer_types.gen.go
@@ -212,20 +212,63 @@ func (w *Int32ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *Int32ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.Int32Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// Int64ColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -410,20 +453,63 @@ func (w *Int64ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *Int64ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.Int64Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// Int96ColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -608,20 +694,63 @@ func (w *Int96ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *Int96ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.Int96Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// Float32ColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -806,20 +935,63 @@ func (w *Float32ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *Float32ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.Float32Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// Float64ColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -1004,20 +1176,63 @@ func (w *Float64ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *Float64ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.Float64Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// BooleanColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -1327,20 +1542,63 @@ func (w *BooleanColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *BooleanColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.BooleanEncoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// ByteArrayColumnChunkWriter is the typed interface for writing columns to a
parquet
@@ -1598,20 +1856,63 @@ func (w *ByteArrayColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *ByteArrayColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.ByteArrayEncoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// FixedLenByteArrayColumnChunkWriter is the typed interface for writing
columns to a parquet
@@ -1877,20 +2178,63 @@ func (w *FixedLenByteArrayColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
}
-}
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall
back
+ // to PLAIN now and discard the dictionary — avoiding the
mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages
without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize
are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded
yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty
dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
+ }
+}
+
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *FixedLenByteArrayColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.FixedLenByteArrayEncoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
// NewColumnChunkWriter constructs a column writer of the appropriate type by
using the metadata builder
diff --git a/parquet/file/column_writer_types.gen.go.tmpl
b/parquet/file/column_writer_types.gen.go.tmpl
index 9a22bb71..6cee1bad 100644
--- a/parquet/file/column_writer_types.gen.go.tmpl
+++ b/parquet/file/column_writer_types.gen.go.tmpl
@@ -481,20 +481,63 @@ func (w *{{.Name}}ColumnChunkWriter)
checkDictionarySizeLimit() {
return
}
- if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >=
int(w.props.DictionaryPageSizeLimit()) {
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) {
w.FallbackToPlain()
+ return
+ }
+
+ // Before any dict-encoded data page has been cut, check whether the
+ // dictionary is actually saving space against a PLAIN baseline. Mirrors
+ // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary
+ // plus the encoded indices meet or exceed the raw input bytes, fall back
+ // to PLAIN now and discard the dictionary — avoiding the mid-cardinality
+ // case where a dict page stays in the file alongside PLAIN pages without
+ // any net compression win.
+ if !w.dictPageWritten && len(w.pages) == 0 {
+ rawSize := dictEnc.ObservedRawSize()
+ encodedSize := dictEnc.EstimatedDataEncodedSize()
+ dictSize := int64(dictEnc.DictEncodedSize())
+ // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0,
+ // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so
+ // PLAIN vs dict doesn't matter and we avoid an empty dictionary.
+ if dictSize+encodedSize >= rawSize {
+ w.FallbackToPlain()
+ }
}
}
+// FallbackToPlain switches this column from dictionary to PLAIN encoding when
+// the dictionary outgrows the configured page size limit. It mirrors
+// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded
+// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the
+// dictionary page is emitted only if a dict-encoded data page had already
+// been cut before the overflow — otherwise the dictionary is discarded.
func (w *{{.Name}}ColumnChunkWriter) FallbackToPlain() {
- if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict {
- w.WriteDictionaryPage()
- w.FlushBufferedDataPages()
- w.fallbackToNonDict = true
- w.currentEncoder.Release()
- w.currentEncoder =
encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem)
- w.encoding = parquet.Encodings.Plain
+ if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict {
+ return
}
+
+ dictEnc := w.currentEncoder.(encoding.DictEncoder)
+ plainEnc :=
encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain),
false, w.descr, w.mem).(encoding.{{.Name}}Encoder)
+
+ if len(w.pages) > 0 {
+ if err := w.WriteDictionaryPage(); err != nil {
+ panic(err)
+ }
+ if err := w.drainBufferedDataPages(); err != nil {
+ panic(err)
+ }
+ }
+
+ if err := dictEnc.FallBackTo(plainEnc); err != nil {
+ panic(err)
+ }
+
+ dictEnc.Release()
+ w.currentEncoder = plainEnc
+ w.encoding = parquet.Encodings.Plain
+ w.fallbackToNonDict = true
}
{{end}}
diff --git a/parquet/file/dict_fallback_repro_test.go
b/parquet/file/dict_fallback_repro_test.go
new file mode 100644
index 00000000..f82b3253
--- /dev/null
+++ b/parquet/file/dict_fallback_repro_test.go
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file_test
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/compress"
+ "github.com/apache/arrow-go/v18/parquet/file"
+ format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow-go/v18/parquet/schema"
+ "github.com/stretchr/testify/require"
+)
+
+// TestDictFallbackDiscardsOrphanDict verifies parquet-mr parity: when dict
+// encoding overflows before any dict-encoded data page has been flushed, the
+// dictionary is discarded and the buffered indices are re-encoded as PLAIN.
+// The resulting chunk must therefore contain zero dict bytes and match a
+// dict-disabled baseline to within the tiny delta from differences in how
+// rows are batched across pages.
+func TestDictFallbackDiscardsOrphanDict(t *testing.T) {
+ cases := []struct {
+ name string
+ version parquet.Version
+ codec compress.Compression
+ }{
+ {"v1/uncompressed", parquet.V1_0, compress.Codecs.Uncompressed},
+ {"v2/uncompressed", parquet.V2_LATEST,
compress.Codecs.Uncompressed},
+ {"v1/snappy", parquet.V1_0, compress.Codecs.Snappy},
+ {"v2/snappy", parquet.V2_LATEST, compress.Codecs.Snappy},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ runDictFallbackNoDictPageCheck(t, tc.version, tc.codec)
+ })
+ }
+}
+
+func runDictFallbackNoDictPageCheck(t *testing.T, version parquet.Version,
codec compress.Compression) {
+ const (
+ numValues = 30000
+ valueWidth = 32
+ dictPageSizeLimit = 256 * 1024
+ dataPageSize = 128 * 1024
+ )
+
+ values := highCardinalityStrings(numValues, valueWidth)
+ knobs := writerKnobs{
+ dictPageSizeLimit: dictPageSizeLimit,
+ dataPageSize: dataPageSize,
+ codec: codec,
+ }
+
+ knobsOn := knobs
+ knobsOn.dictEnabled = true
+ _, dictOn := writeByteArrayColumn(t, values, version, knobsOn)
+
+ knobsOff := knobs
+ knobsOff.dictEnabled = false
+ _, dictOff := writeByteArrayColumn(t, values, version, knobsOff)
+
+ t.Logf("version=%s codec=%s numValues=%d valueWidth=%d (raw≈%d KiB)",
+ version, codec, numValues, valueWidth,
numValues*valueWidth/1024)
+ t.Logf("dict=ON chunk.compressed=%d chunk.uncompressed=%d",
dictOn.totalCompressed, dictOn.totalUncompressed)
+ t.Logf("dict=OFF chunk.compressed=%d chunk.uncompressed=%d",
dictOff.totalCompressed, dictOff.totalUncompressed)
+ t.Logf("dict=ON encodings: %v", dictOn.encodings)
+ t.Logf("dict=ON per-page breakdown:")
+ for _, p := range dictOn.pages {
+ t.Logf(" %-18s encoding=%-16s numValues=%-6d compressed=%d B",
+ p.pageType, p.encoding, p.numValues, p.compressed)
+ }
+
+ require.Falsef(t, dictOn.hasDictionaryPage,
+ "dictionary page should have been discarded; chunk=%d",
dictOn.totalCompressed)
+ require.Zero(t,
+ dictOn.countDataPagesByEncoding(parquet.Encodings.PlainDict)+
+
dictOn.countDataPagesByEncoding(parquet.Encodings.RLEDict),
+ "no dict-encoded data pages expected after discard;
encodings=%v", dictOn.encodings)
+ require.NotContains(t, dictOn.encodings, parquet.Encodings.PlainDict,
+ "encoding list should not advertise PLAIN_DICTIONARY when no
dict page exists")
+ require.NotContains(t, dictOn.encodings, parquet.Encodings.RLEDict,
+ "encoding list should not advertise RLE_DICTIONARY when no dict
page exists")
+
+ // With the fix, dict=on should match dict=off to within the rounding
from
+ // first-page size differences (FallbackToPlain re-seeds the plain
encoder
+ // with all buffered values, producing one oversized first page).
+ diff := dictOn.totalCompressed - dictOff.totalCompressed
+ t.Logf("size delta dict=on vs dict=off: %d B", diff)
+ require.LessOrEqualf(t, absInt64(diff), int64(valueWidth*2048),
+ "post-fix: dict=on should closely match dict=off, got
delta=%d", diff)
+}
+
+// TestDictFallbackKeepsDictWhenAlreadyFlushed covers the case where some
+// dict-encoded data pages have already been emitted before the overflow —
+// those must be preserved alongside the dict page, and the remainder of the
+// column must switch to PLAIN. Matches parquet-mr's
"initialUsedAndHadDictionary"
+// path.
+func TestDictFallbackKeepsDictWhenAlreadyFlushed(t *testing.T) {
+ const (
+ valueWidth = 32
+ dictPageSizeLimit = 8 * 1024
+ dataPageSize = 4 * 1024
+ )
+
+ // Mixed-cardinality data: a low-card prefix big enough that several
+ // dict-encoded data pages are flushed (and pass the first-page
+ // compression check) before a high-card tail overflows the dict and
+ // triggers fallback. The committed dict pages must be kept and the
+ // dictionary page preserved.
+ lowCard := make([]parquet.ByteArray, 5000)
+ for i := range lowCard {
+ lowCard[i] = parquet.ByteArray(fmt.Sprintf("cat_%02d%*s", i%16,
valueWidth-8, ""))
+ }
+ highCard := highCardinalityStrings(5000, valueWidth)
+ values := append(lowCard, highCard...)
+
+ _, chunk := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+ dictEnabled: true,
+ dictPageSizeLimit: dictPageSizeLimit,
+ dataPageSize: dataPageSize,
+ codec: compress.Codecs.Uncompressed,
+ })
+
+ t.Logf("encodings: %v hasDictPage=%v totalCompressed=%d",
+ chunk.encodings, chunk.hasDictionaryPage, chunk.totalCompressed)
+
+ require.True(t, chunk.hasDictionaryPage,
+ "dict page must be kept when dict-encoded data pages were
already flushed")
+
+ dictPages :=
chunk.countDataPagesByEncoding(parquet.Encodings.PlainDict) +
+ chunk.countDataPagesByEncoding(parquet.Encodings.RLEDict)
+ plainPages := chunk.countDataPagesByEncoding(parquet.Encodings.Plain)
+ require.Greater(t, dictPages, 0, "expected dict-encoded data pages to
survive fallback")
+ require.Greater(t, plainPages, 0, "expected PLAIN data pages after
fallback")
+
+ // All dict-encoded data pages must come before any PLAIN page —
otherwise
+ // the dictionary offset in the file footer would reference data pages
+ // that can't be decoded against it.
+ sawPlain := false
+ for _, p := range chunk.pages {
+ if p.pageType != format.PageType_DATA_PAGE && p.pageType !=
format.PageType_DATA_PAGE_V2 {
+ continue
+ }
+ switch p.encoding {
+ case parquet.Encodings.Plain:
+ sawPlain = true
+ case parquet.Encodings.PlainDict, parquet.Encodings.RLEDict:
+ require.False(t, sawPlain,
+ "dict-encoded data page appeared after a PLAIN
page — wrong ordering")
+ }
+ }
+}
+
+func absInt64(v int64) int64 {
+ if v < 0 {
+ return -v
+ }
+ return v
+}
+
+// TestDictFallbackMidCardinality exercises the case the iceberg-go TPC-DS
+// benchmark flagged: mid-cardinality columns (ss_list_price etc.) where the
+// dictionary grows slowly enough to pass the first-page compression check,
+// eventually overflows, and leaves a dict page + dict-encoded pages + PLAIN
+// pages stranded in one chunk. With dict-mode pages sized by raw bytes
+// (matching parquet-mr), overflow happens at a similar cadence to plain
+// encoding, so dict=on either stays competitive with dict=off or the
+// committed dict pages are genuinely earning their keep.
+func TestDictFallbackMidCardinality(t *testing.T) {
+ const (
+ numRows = 200000
+ valueWidth = 8
+ distinctValueCount = 20000
+ dictPageSizeLimit = 128 * 1024
+ dataPageSize = 128 * 1024
+ )
+
+ values := midCardinalityStrings(numRows, distinctValueCount, valueWidth)
+
+ _, dictOn := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+ dictEnabled: true,
+ dictPageSizeLimit: dictPageSizeLimit,
+ dataPageSize: dataPageSize,
+ codec: compress.Codecs.Snappy,
+ })
+ _, dictOff := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{
+ dictEnabled: false,
+ dataPageSize: dataPageSize,
+ codec: compress.Codecs.Snappy,
+ })
+
+ t.Logf("mid-card: distinct=%d rows=%d width=%d", distinctValueCount,
numRows, valueWidth)
+ t.Logf("dict=ON compressed=%d encodings=%v", dictOn.totalCompressed,
dictOn.encodings)
+ t.Logf("dict=OFF compressed=%d encodings=%v", dictOff.totalCompressed,
dictOff.encodings)
+
+ // The specific assertion: dict=on must not regress against dict=off by
+ // more than a small constant. Before the raw-byte page cadence, this
+ // scenario produced the 4-entry encoding layout and 20-30% bloat.
+ require.LessOrEqualf(t,
+ dictOn.totalCompressed,
dictOff.totalCompressed+int64(numRows)/10,
+ "dict=on (%d) must not balloon against dict=off (%d) on
mid-card data",
+ dictOn.totalCompressed, dictOff.totalCompressed)
+}
+
+func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray {
+ pool := make([]parquet.ByteArray, distinct)
+ for i := range pool {
+ b := make([]byte, width)
+ for j := range width {
+ b[j] = byte('A' + (i+j*7)%26)
+ }
+ tail := fmt.Appendf(nil, "-%05d", i)
+ copy(b[width-len(tail):], tail)
+ pool[i] = parquet.ByteArray(b)
+ }
+ out := make([]parquet.ByteArray, n)
+ for i := range out {
+ // Deterministic, roughly uniform distribution across the pool.
+ out[i] = pool[int64(i)*2654435761%int64(distinct)]
+ }
+ return out
+}
+
+type writerKnobs struct {
+ dictEnabled bool
+ dictPageSizeLimit int64
+ dataPageSize int64
+ codec compress.Compression
+}
+
+type pageInfo struct {
+ pageType format.PageType
+ encoding parquet.Encoding
+ numValues int32
+ compressed int
+}
+
+type chunkSummary struct {
+ totalCompressed int64
+ totalUncompressed int64
+ encodings []parquet.Encoding
+ encodingStats []pageEncodingStatsEntry
+ pages []pageInfo
+ hasDictionaryPage bool
+}
+
+type pageEncodingStatsEntry struct {
+ PageType format.PageType
+ Encoding parquet.Encoding
+}
+
+func (c *chunkSummary) countDataPagesByEncoding(enc parquet.Encoding) int {
+ n := 0
+ for _, p := range c.pages {
+ if p.encoding == enc && (p.pageType ==
format.PageType_DATA_PAGE || p.pageType == format.PageType_DATA_PAGE_V2) {
+ n++
+ }
+ }
+ return n
+}
+
+func writeByteArrayColumn(t *testing.T, values []parquet.ByteArray, version
parquet.Version, knobs writerKnobs) ([]byte, *chunkSummary) {
+ t.Helper()
+
+ root := mustByteArraySchema(t)
+ opts := []parquet.WriterProperty{
+ parquet.WithVersion(version),
+ parquet.WithDictionaryDefault(knobs.dictEnabled),
+ parquet.WithCompression(knobs.codec),
+ }
+ if knobs.dictPageSizeLimit > 0 {
+ opts = append(opts,
parquet.WithDictionaryPageSizeLimit(knobs.dictPageSizeLimit))
+ }
+ if knobs.dataPageSize > 0 {
+ opts = append(opts,
parquet.WithDataPageSize(knobs.dataPageSize))
+ }
+ if version == parquet.V2_LATEST {
+ opts = append(opts,
parquet.WithDataPageVersion(parquet.DataPageV2))
+ }
+ props := parquet.NewWriterProperties(opts...)
+
+ var buf bytes.Buffer
+ w := file.NewParquetWriter(&buf, root, file.WithWriterProps(props))
+ rgw := w.AppendRowGroup()
+ cw, err := rgw.NextColumn()
+ require.NoError(t, err)
+ _, err = cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(values, nil,
nil)
+ require.NoError(t, err)
+ require.NoError(t, cw.Close())
+ require.NoError(t, rgw.Close())
+ require.NoError(t, w.Close())
+
+ return buf.Bytes(), summarizeFirstColumnChunk(t, buf.Bytes())
+}
+
+func summarizeFirstColumnChunk(t *testing.T, raw []byte) *chunkSummary {
+ t.Helper()
+
+ r, err := file.NewParquetReader(bytes.NewReader(raw))
+ require.NoError(t, err)
+ defer r.Close()
+
+ md, err := r.MetaData().RowGroup(0).ColumnChunk(0)
+ require.NoError(t, err)
+ s := &chunkSummary{
+ totalCompressed: md.TotalCompressedSize(),
+ totalUncompressed: md.TotalUncompressedSize(),
+ encodings: md.Encodings(),
+ hasDictionaryPage: md.HasDictionaryPage(),
+ }
+ for _, es := range md.EncodingStats() {
+ s.encodingStats = append(s.encodingStats,
pageEncodingStatsEntry{
+ PageType: es.PageType,
+ Encoding: es.Encoding,
+ })
+ }
+
+ rg := r.RowGroup(0)
+ pr, err := rg.GetColumnPageReader(0)
+ require.NoError(t, err)
+ for pr.Next() {
+ page := pr.Page()
+ s.pages = append(s.pages, pageInfo{
+ pageType: format.PageType(page.Type()),
+ encoding: parquet.Encoding(page.Encoding()),
+ numValues: page.NumValues(),
+ compressed: len(page.Data()),
+ })
+ }
+ require.NoError(t, pr.Err())
+ return s
+}
+
+func mustByteArraySchema(t *testing.T) *schema.GroupNode {
+ t.Helper()
+ root, err := schema.NewGroupNode("schema",
parquet.Repetitions.Required, schema.FieldList{
+ schema.NewByteArrayNode("v", parquet.Repetitions.Required, -1),
+ }, -1)
+ require.NoError(t, err)
+ return root
+}
+
+func highCardinalityStrings(n, width int) []parquet.ByteArray {
+ out := make([]parquet.ByteArray, n)
+ for i := range out {
+ b := make([]byte, width)
+ for j := range width {
+ b[j] = byte('A' + (i+j*31)%26)
+ }
+ tail := fmt.Appendf(nil, "-%07d", i)
+ copy(b[width-len(tail):], tail)
+ out[i] = parquet.ByteArray(b)
+ }
+ return out
+}
diff --git a/parquet/internal/encoding/byte_array_encoder.go
b/parquet/internal/encoding/byte_array_encoder.go
index d0439faf..397a18a7 100644
--- a/parquet/internal/encoding/byte_array_encoder.go
+++ b/parquet/internal/encoding/byte_array_encoder.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/internal/bitutils"
+ "github.com/apache/arrow-go/v18/internal/hashing"
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
)
@@ -103,6 +104,7 @@ func (enc *DictByteArrayEncoder) PutByteArray(in
parquet.ByteArray) {
enc.dictEncodedSize += in.Len() + arrow.Uint32SizeBytes
}
enc.addIndex(memoIdx)
+ enc.AddRawSize(int64(in.Len() + arrow.Uint32SizeBytes))
}
// Put takes a slice of ByteArrays to add and encode.
@@ -127,6 +129,23 @@ func (enc *DictByteArrayEncoder) NormalizeDict(values
arrow.Array) (arrow.Array,
return values, nil
}
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictByteArrayEncoder) FallBackTo(fallback TypedEncoder) error {
+ target, ok := fallback.(ByteArrayEncoder)
+ if !ok {
+ return fmt.Errorf("parquet: dict fallback target encoder has
wrong element type")
+ }
+ bm := enc.memo.(*hashing.BinaryMemoTable)
+ vals := make([]parquet.ByteArray, len(enc.idxValues))
+ for i, idx := range enc.idxValues {
+ vals[i] = parquet.ByteArray(bm.Value(int(idx)))
+ }
+ target.Put(vals)
+ enc.idxValues = enc.idxValues[:0]
+ return nil
+}
+
// PutDictionary allows pre-seeding a dictionary encoder with
// a dictionary from an Arrow Array.
//
diff --git a/parquet/internal/encoding/encoder.go
b/parquet/internal/encoding/encoder.go
index 2f1d698f..64a992e7 100644
--- a/parquet/internal/encoding/encoder.go
+++ b/parquet/internal/encoding/encoder.go
@@ -114,6 +114,13 @@ type dictEncoder struct {
idxValues []int32
memo MemoTable
+ // rawDataSize is the number of bytes of input values observed since
+ // the last page flush. Mirrors parquet-mr's rawDataByteSize and is
+ // consulted by FlushCurrentPage to decide whether the dictionary is
+ // actually compressing (dict + indices < raw) before committing the
+ // first dict-encoded data page.
+ rawDataSize int64
+
preservedDict arrow.Array
}
@@ -134,6 +141,7 @@ func (d *dictEncoder) Reset() {
d.dictEncodedSize = 0
d.idxValues = d.idxValues[:0]
d.idxBuffer.ResizeNoShrink(0)
+ d.rawDataSize = 0
d.memo.Reset()
if d.preservedDict != nil {
d.preservedDict.Release()
@@ -141,6 +149,17 @@ func (d *dictEncoder) Reset() {
}
}
+// ObservedRawSize returns the number of raw input bytes accumulated since
+// the last data page flush. Used with DictEncodedSize and
+// EstimatedDataEncodedSize to evaluate whether dictionary encoding is
+// actually saving space on the first page.
+func (d *dictEncoder) ObservedRawSize() int64 { return d.rawDataSize }
+
+// AddRawSize is used by per-type dict encoders to accumulate the raw-bytes
+// total as values are written. Exported at package scope because the typed
+// encoders live in different files within this package.
+func (d *dictEncoder) AddRawSize(n int64) { d.rawDataSize += n }
+
func (d *dictEncoder) Release() {
d.encoder.Release()
d.idxBuffer.Release()
@@ -285,6 +304,7 @@ func (d *dictEncoder) WriteIndices(out []byte) (int, error)
{
nbytes := enc.Flush()
d.idxValues = d.idxValues[:0]
+ d.rawDataSize = 0
return nbytes + 1, nil
}
diff --git a/parquet/internal/encoding/fixed_len_byte_array_encoder.go
b/parquet/internal/encoding/fixed_len_byte_array_encoder.go
index 0f4345d2..854b8b8a 100644
--- a/parquet/internal/encoding/fixed_len_byte_array_encoder.go
+++ b/parquet/internal/encoding/fixed_len_byte_array_encoder.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/internal/bitutils"
+ "github.com/apache/arrow-go/v18/internal/hashing"
"github.com/apache/arrow-go/v18/parquet"
)
@@ -136,6 +137,7 @@ func (enc *DictFixedLenByteArrayEncoder) Put(in
[]parquet.FixedLenByteArray) {
}
enc.addIndex(memoIdx)
}
+ enc.AddRawSize(int64(len(in)) * int64(enc.typeLen))
}
// PutSpaced is like Put but leaves space for nulls
@@ -151,6 +153,23 @@ func (enc *DictFixedLenByteArrayEncoder)
NormalizeDict(values arrow.Array) (arro
return values, nil
}
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictFixedLenByteArrayEncoder) FallBackTo(fallback TypedEncoder)
error {
+ target, ok := fallback.(FixedLenByteArrayEncoder)
+ if !ok {
+ return fmt.Errorf("parquet: dict fallback target encoder has
wrong element type")
+ }
+ bm := enc.memo.(*hashing.BinaryMemoTable)
+ vals := make([]parquet.FixedLenByteArray, len(enc.idxValues))
+ for i, idx := range enc.idxValues {
+ vals[i] = parquet.FixedLenByteArray(bm.Value(int(idx)))
+ }
+ target.Put(vals)
+ enc.idxValues = enc.idxValues[:0]
+ return nil
+}
+
// PutDictionary allows pre-seeding a dictionary encoder with
// a dictionary from an Arrow Array.
//
diff --git a/parquet/internal/encoding/typed_encoder.go
b/parquet/internal/encoding/typed_encoder.go
index 1cba18da..82bb6fc9 100644
--- a/parquet/internal/encoding/typed_encoder.go
+++ b/parquet/internal/encoding/typed_encoder.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/bitutils"
+ "github.com/apache/arrow-go/v18/internal/hashing"
shared_utils "github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
@@ -144,6 +145,7 @@ func (enc *typedDictEncoder[T]) Put(in []T) {
for _, val := range in {
enc.dictEncoder.Put(val)
}
+ enc.AddRawSize(int64(len(in)) * int64(unsafe.Sizeof(T(0))))
}
func (enc *typedDictEncoder[T]) PutSpaced(in []T, validBits []byte,
validBitsOffset int64) {
@@ -158,6 +160,22 @@ type arrvalues[T arrow.ValueType] interface {
Values() []T
}
+func (enc *typedDictEncoder[T]) FallBackTo(fallback TypedEncoder) error {
+ target, ok := fallback.(Encoder[T])
+ if !ok {
+ return fmt.Errorf("parquet: dict fallback target encoder has
wrong element type")
+ }
+ dict := make([]T, enc.memo.Size())
+ enc.memo.CopyValues(dict)
+ vals := make([]T, len(enc.idxValues))
+ for i, idx := range enc.idxValues {
+ vals[i] = dict[idx]
+ }
+ target.Put(vals)
+ enc.idxValues = enc.idxValues[:0]
+ return nil
+}
+
func (enc *typedDictEncoder[T]) NormalizeDict(values arrow.Array)
(arrow.Array, error) {
if _, ok := values.(arrvalues[T]); ok {
values.Retain()
@@ -427,6 +445,7 @@ func (enc *DictInt96Encoder) Put(in []parquet.Int96) {
}
enc.addIndex(memoIdx)
}
+ enc.AddRawSize(int64(len(in)) * int64(parquet.Int96SizeBytes))
}
// PutSpaced is like Put but assumes space for nulls
@@ -446,6 +465,24 @@ func (enc *DictInt96Encoder) PutDictionary(arrow.Array)
error {
return fmt.Errorf("%w: direct PutDictionary to Int96",
arrow.ErrNotImplemented)
}
+// FallBackTo drains buffered indices through the dictionary into the
+// fallback plain encoder and clears the index buffer.
+func (enc *DictInt96Encoder) FallBackTo(fallback TypedEncoder) error {
+ target, ok := fallback.(Int96Encoder)
+ if !ok {
+ return fmt.Errorf("parquet: dict fallback target encoder has
wrong element type")
+ }
+ bm := enc.memo.(*hashing.BinaryMemoTable)
+ vals := make([]parquet.Int96, len(enc.idxValues))
+ for i, idx := range enc.idxValues {
+ v := bm.Value(int(idx))
+ copy(vals[i][:], v)
+ }
+ target.Put(vals)
+ enc.idxValues = enc.idxValues[:0]
+ return nil
+}
+
// the boolEncoderTraits struct is used to make it easy to create encoders and
decoders based on type
type boolEncoderTraits struct{}
diff --git a/parquet/internal/encoding/types.go
b/parquet/internal/encoding/types.go
index a2f1a5b2..7c4015df 100644
--- a/parquet/internal/encoding/types.go
+++ b/parquet/internal/encoding/types.go
@@ -119,6 +119,20 @@ type DictEncoder interface {
//
// The returned array must always be released by the caller.
NormalizeDict(arrow.Array) (arrow.Array, error)
+ // FallBackTo translates the buffered indices back through the
dictionary
+ // and puts the raw values into the fallback encoder, clearing the dict
+ // encoder's index buffer. Mirrors parquet-mr's
+ // RequiresFallback.fallBackAllValuesTo: used by column writers when the
+ // dictionary overflows mid-chunk so already-buffered values can be
+ // re-encoded with the fallback (PLAIN) encoder instead of being emitted
+ // as a stranded dict-encoded page.
+ FallBackTo(fallback TypedEncoder) error
+ // ObservedRawSize returns the raw input byte count accumulated since
+ // the last page flush. Used alongside DictEncodedSize and
+ // EstimatedDataEncodedSize to decide whether dictionary encoding is
+ // actually saving space before committing the first dict data page —
+ // mirrors parquet-mr's rawDataByteSize.
+ ObservedRawSize() int64
}
var bufferPool = sync.Pool{
diff --git a/parquet/pqarrow/encode_dictionary_test.go
b/parquet/pqarrow/encode_dictionary_test.go
index 79d3147d..e92b2587 100644
--- a/parquet/pqarrow/encode_dictionary_test.go
+++ b/parquet/pqarrow/encode_dictionary_test.go
@@ -188,7 +188,6 @@ func (ad *ArrowWriteDictionarySuite)
TestStatisticsWithFallback() {
{0, 1},
{0, 0},
{3}}
- expectedDictCounts := []int32{4, 4, 4, 3}
// pairs of (min, max)
expectedMinMax := [][2]string{
{"a", "b"},
@@ -266,22 +265,23 @@ func (ad *ArrowWriteDictionarySuite)
TestStatisticsWithFallback() {
for rowGroup := 0; rowGroup < 2; rowGroup++ {
pr, err :=
rdr.RowGroup(0).GetColumnPageReader(0)
ad.Require().NoError(err)
- ad.True(pr.Next())
- page := pr.Page()
- ad.NotNil(page)
- ad.NoError(pr.Err())
-
ad.Require().IsType((*file.DictionaryPage)(nil), page)
- dictPage := page.(*file.DictionaryPage)
- ad.EqualValues(expectedDictCounts[caseIndex],
dictPage.NumValues())
+
+ // pqarrow falls back to PLAIN whenever the
arrow dict has
+ // duplicates, and does so before any
dict-encoded data page
+ // is flushed. Matching parquet-mr, arrow-go
now discards the
+ // dictionary in that case — so no DICTIONARY
page appears and
+ // all data pages are PLAIN.
for pageIdx := 0; pageIdx <
expectedNumDataPages[caseIndex]; pageIdx++ {
ad.True(pr.Next())
- page = pr.Page()
+ page := pr.Page()
ad.NotNil(page)
ad.NoError(pr.Err())
dataPage, ok := page.(file.DataPage)
ad.Require().True(ok)
+ ad.EqualValues(parquet.Encodings.Plain,
dataPage.Encoding(),
+ "page %d should be PLAIN after
dict fallback", pageIdx)
stats := dataPage.Statistics()
ad.EqualValues(expectedNullByPage[caseIndex][pageIdx], stats.NullCount)
diff --git a/parquet/pqarrow/file_writer_test.go
b/parquet/pqarrow/file_writer_test.go
index 0c771931..7b98b212 100644
--- a/parquet/pqarrow/file_writer_test.go
+++ b/parquet/pqarrow/file_writer_test.go
@@ -171,8 +171,8 @@ func TestFileWriterTotalBytes(t *testing.T) {
require.NoError(t, writer.Close())
// Verify total bytes & compressed bytes are correct
- assert.Equal(t, int64(408), writer.TotalCompressedBytes())
- assert.Equal(t, int64(910), writer.TotalBytesWritten())
+ assert.Equal(t, int64(340), writer.TotalCompressedBytes())
+ assert.Equal(t, int64(799), writer.TotalBytesWritten())
}
func TestFileWriterTotalBytesBuffered(t *testing.T) {
@@ -205,8 +205,8 @@ func TestFileWriterTotalBytesBuffered(t *testing.T) {
require.NoError(t, writer.Close())
// Verify total bytes & compressed bytes are correct
- assert.Equal(t, int64(596), writer.TotalCompressedBytes())
- assert.Equal(t, int64(1306), writer.TotalBytesWritten())
+ assert.Equal(t, int64(494), writer.TotalCompressedBytes())
+ assert.Equal(t, int64(1139), writer.TotalBytesWritten())
}
func TestWriteOnClosedFileWriter(t *testing.T) {