zeroshade commented on a change in pull request #9671:
URL: https://github.com/apache/arrow/pull/9671#discussion_r595609841



##########
File path: go/parquet/internal/utils/rle.go
##########
@@ -0,0 +1,555 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+       "bytes"
+       "encoding/binary"
+       "io"
+       "math"
+
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/parquet"
+       "golang.org/x/xerrors"
+)
+
+//go:generate go run ../../../arrow/_tools/tmpl/main.go -i 
-data=physical_types.tmpldata typed_rle_dict.gen.go.tmpl
+
+const (
+       MaxValuesPerLiteralRun = (1 << 6) * 8
+)
+
+func MinBufferSize(bitWidth int) int {
+       maxLiteralRunSize := 1 + 
bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth))
+       maxRepeatedRunSize := binary.MaxVarintLen32 + 
bitutil.BytesForBits(int64(bitWidth))
+       return int(Max(maxLiteralRunSize, maxRepeatedRunSize))
+}
+
+func MaxBufferSize(width, numValues int) int {
+       bytesPerRun := width
+       numRuns := int(bitutil.BytesForBits(int64(numValues)))
+       literalMaxSize := numRuns + (numRuns * bytesPerRun)
+
+       minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width)))
+       repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) * 
minRepeatedRunSize
+
+       return MaxInt(literalMaxSize, repeatedMaxSize)
+}
+
+// Utility classes to do run length encoding (RLE) for fixed bit width values. 
 If runs
+// are sufficiently long, RLE is used, otherwise, the values are just 
bit-packed
+// (literal encoding).
+// For both types of runs, there is a byte-aligned indicator which encodes the 
length
+// of the run and the type of the run.
+// This encoding has the benefit that when there aren't any long enough runs, 
values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the 
value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+//    encoded-block := run*
+//    run := literal-run | repeated-run
+//    literal-run := literal-indicator < literal bytes >
+//    repeated-run := repeated-indicator < repeated value. padded to byte 
boundary >
+//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
+//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The 
rest
+// of the varint is used to determine the length of the run (eg how many times 
the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e. 
encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence 
will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups 
rather than
+// the actual number of encoded ints. (This means that the total number of 
encoded values
+// can not be determined from the encoded data, since the number of values in 
the last
+// group may not be a multiple of 8). For the last group of literal runs, we 
pad
+// the group to 8 with zeros. This allows for 8 at a time decoding on the read 
side
+// without the need for additional checks.
+//
+// There is a break-even point when it is more storage efficient to do run 
length
+// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 
bytes
+// for both the repeated encoding or the literal encoding.  This value can 
always
+// be computed based on the bit-width.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 
byte>
+//  - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+//
+
+type RleDecoder struct {
+       r *BitReader
+
+       buffered uint64
+       bitWidth int
+       curVal   uint64
+       repCount int32
+       litCount int32
+}
+
+func NewRleDecoder(data *bytes.Reader, width int) *RleDecoder {
+       return &RleDecoder{r: NewBitReader(data), bitWidth: width}
+}
+
+func (r *RleDecoder) Reset(data *bytes.Reader, width int) {
+       r.bitWidth = width
+       r.curVal = 0
+       r.repCount = 0
+       r.litCount = 0
+       r.r.Reset(data)
+}
+
+func (r *RleDecoder) Next() bool {
+       indicator, ok := r.r.GetVlqInt()
+       if !ok {
+               return false
+       }
+
+       literal := (indicator & 1) != 0
+       count := uint32(indicator >> 1)
+       if literal {
+               if count == 0 || count > uint32(math.MaxInt32/8) {
+                       return false
+               }
+               r.litCount = int32(count) * 8
+       } else {
+               if count == 0 || count > uint32(math.MaxInt32) {
+                       return false
+               }
+               r.repCount = int32(count)
+
+               nbytes := int(bitutil.BytesForBits(int64(r.bitWidth)))
+               switch {
+               case nbytes > 4:
+                       if !r.r.GetAligned(nbytes, &r.curVal) {
+                               return false
+                       }
+               case nbytes > 2:
+                       var val uint32
+                       if !r.r.GetAligned(nbytes, &val) {
+                               return false
+                       }
+                       r.curVal = uint64(val)
+               case nbytes > 1:
+                       var val uint16
+                       if !r.r.GetAligned(nbytes, &val) {
+                               return false
+                       }
+                       r.curVal = uint64(val)
+               default:
+                       var val uint8
+                       if !r.r.GetAligned(nbytes, &val) {
+                               return false
+                       }
+                       r.curVal = uint64(val)
+               }
+       }
+       return true
+}
+
+func (r *RleDecoder) GetValue() (uint64, bool) {
+       vals := make([]uint64, 1)
+       n := r.GetBatch(vals)
+       return vals[0], n == 1
+}
+
+func (r *RleDecoder) GetBatch(values []uint64) int {
+       read := 0
+       size := len(values)
+
+       out := values
+       for read < size {
+               remain := size - read
+
+               if r.repCount > 0 {
+                       repbatch := int(math.Min(float64(remain), 
float64(r.repCount)))
+                       for i := 0; i < repbatch; i++ {
+                               out[i] = r.curVal
+                       }
+
+                       r.repCount -= int32(repbatch)
+                       read += repbatch
+                       out = out[repbatch:]
+               } else if r.litCount > 0 {
+                       litbatch := int(math.Min(float64(remain), 
float64(r.litCount)))
+                       n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch])
+                       if n != litbatch {
+                               return read
+                       }
+
+                       r.litCount -= int32(litbatch)
+                       read += litbatch
+                       out = out[litbatch:]
+               } else {
+                       if !r.Next() {
+                               return read
+                       }
+               }
+       }
+       return read
+}
+
+func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits 
[]byte, validBitsOffset int64) (int, error) {
+       if nullcount == 0 {
+               return r.GetBatch(vals), nil
+       }
+
+       converter := plainConverter{}
+       blockCounter := NewBitBlockCounter(validBits, validBitsOffset, 
int64(len(vals)))
+
+       var (
+               totalProcessed int
+               processed      int
+               block          BitBlockCount
+               err            error
+       )
+
+       for {
+               block = blockCounter.NextFourWords()
+               if block.Len == 0 {
+                       break
+               }
+
+               if block.AllSet() {
+                       processed = r.GetBatch(vals[:block.Len])
+               } else if block.NoneSet() {
+                       converter.FillZero(vals[:block.Len])
+                       processed = int(block.Len)
+               } else {
+                       processed, err = r.getspaced(converter, vals, 
int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset)
+                       if err != nil {
+                               return totalProcessed, err
+                       }
+               }
+
+               totalProcessed += processed
+               vals = vals[int(block.Len):]
+               validBitsOffset += int64(block.Len)
+
+               if processed != int(block.Len) {
+                       break
+               }
+       }
+       return totalProcessed, nil
+}
+
+func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{}, 
batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) 
{
+       switch vals := vals.(type) {
+       case []int32:
+               return r.getspacedInt32(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []int64:
+               return r.getspacedInt64(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []float32:
+               return r.getspacedFloat32(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []float64:
+               return r.getspacedFloat64(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []parquet.ByteArray:
+               return r.getspacedByteArray(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []parquet.FixedLenByteArray:
+               return r.getspacedFixedLenByteArray(dc, vals, batchSize, 
nullCount, validBits, validBitsOffset)
+       case []parquet.Int96:
+               return r.getspacedInt96(dc, vals, batchSize, nullCount, 
validBits, validBitsOffset)
+       case []uint64:
+               if nullCount == batchSize {
+                       dc.FillZero(vals[:batchSize])
+                       return batchSize, nil
+               }
+
+               read := 0
+               remain := batchSize - nullCount
+
+               const bufferSize = 1024
+               var indexbuffer [bufferSize]IndexType
+
+               // assume no bits to start
+               bitReader := NewBitRunReader(validBits, validBitsOffset, 
int64(batchSize))
+               validRun := bitReader.NextRun()
+               for read < batchSize {

Review comment:
       Updated version has the split up into helper functions for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to