zeroshade commented on a change in pull request #10379:
URL: https://github.com/apache/arrow/pull/10379#discussion_r664709759



##########
File path: go/parquet/internal/encoding/delta_byte_array.go
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding
+
+import (
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "golang.org/x/xerrors"
+)
+
+// DeltaByteArrayEncoder is an encoder for writing bytearrays which are delta 
encoded
+// this is also known as incremental encoding or front compression. For each 
element
+// in a sequence of strings, we store the prefix length of the previous entry 
plus the suffix
+// see https://en.wikipedia.org/wiki/Incremental_encoding for a longer 
description.
+//
+// This is stored as a sequence of delta-encoded prefix lengths followed by 
the suffixes
+// encoded as delta length byte arrays.
+type DeltaByteArrayEncoder struct {
+       encoder
+
+       prefixEncoder *DeltaBitPackInt32Encoder
+       suffixEncoder *DeltaLengthByteArrayEncoder
+
+       lastVal parquet.ByteArray
+}
+
+func (enc *DeltaByteArrayEncoder) initEncoders() {
+       enc.prefixEncoder = &DeltaBitPackInt32Encoder{
+               deltaBitPackEncoder: &deltaBitPackEncoder{encoder: 
newEncoderBase(enc.encoding, nil, enc.mem)}}
+       enc.suffixEncoder = &DeltaLengthByteArrayEncoder{
+               newEncoderBase(enc.encoding, nil, enc.mem),
+               &DeltaBitPackInt32Encoder{
+                       deltaBitPackEncoder: &deltaBitPackEncoder{encoder: 
newEncoderBase(enc.encoding, nil, enc.mem)}}}
+}
+
+// Type returns the underlying physical type this operates on, in this case 
ByteArrays only
+func (DeltaByteArrayEncoder) Type() parquet.Type { return 
parquet.Types.ByteArray }
+
+// Put writes a slice of ByteArrays to the encoder
+func (enc *DeltaByteArrayEncoder) Put(in []parquet.ByteArray) {
+       if len(in) == 0 {
+               return
+       }
+
+       var suf parquet.ByteArray
+       if enc.prefixEncoder == nil { // initialize our encoders if we haven't 
yet
+               enc.initEncoders()
+               enc.prefixEncoder.Put([]int32{0})
+               suf = in[0]
+               enc.lastVal = append([]byte(nil), in[0]...)
+               enc.suffixEncoder.Put([]parquet.ByteArray{suf})
+               in = in[1:]
+       }
+
+       // for each value, figure out the common prefix with the previous value
+       // and then write the prefix length and the suffix.
+       for _, val := range in {
+               l1 := enc.lastVal.Len()
+               l2 := val.Len()
+               j := 0
+               for j < l1 && j < l2 {
+                       if enc.lastVal[j] != val[j] {
+                               break
+                       }
+                       j++
+               }
+               enc.prefixEncoder.Put([]int32{int32(j)})
+               suf = val[j:]
+               enc.suffixEncoder.Put([]parquet.ByteArray{suf})
+               enc.lastVal = append([]byte(nil), val...)
+       }
+}
+
+// PutSpaced is like Put, but assumes the data is already spaced for nulls and 
uses the bitmap provided and offset
+// to compress the data before writing it without the null slots.
+func (enc *DeltaByteArrayEncoder) PutSpaced(in []parquet.ByteArray, validBits 
[]byte, validBitsOffset int64) {
+       if validBits != nil {
+               data := make([]parquet.ByteArray, len(in))
+               nvalid := spacedCompress(in, data, validBits, validBitsOffset)
+               enc.Put(data[:nvalid])
+       } else {
+               enc.Put(in)
+       }
+}
+
+// Flush flushes any remaining data out and returns the finished encoded 
buffer.
+func (enc *DeltaByteArrayEncoder) FlushValues() Buffer {
+       if enc.prefixEncoder == nil {
+               enc.initEncoders()
+       }
+       prefixBuf := enc.prefixEncoder.FlushValues()
+       defer prefixBuf.Release()
+
+       suffixBuf := enc.suffixEncoder.FlushValues()
+       defer suffixBuf.Release()
+
+       ret := bufferPool.Get().(*memory.Buffer)
+       ret.ResizeNoShrink(prefixBuf.Len() + suffixBuf.Len())
+       copy(ret.Bytes(), prefixBuf.Bytes())
+       copy(ret.Bytes()[prefixBuf.Len():], suffixBuf.Bytes())
+       return poolBuffer{ret}
+}
+
+// DeltaByteArrayDecoder is a decoder for a column of data encoded using 
incremental or prefix encoding.
+type DeltaByteArrayDecoder struct {
+       *DeltaLengthByteArrayDecoder
+
+       prefixLengths []int32
+       lastVal       parquet.ByteArray
+}
+
+// Type returns the underlying physical type this decoder operates on, in this 
case ByteArrays only
+func (DeltaByteArrayDecoder) Type() parquet.Type {
+       return parquet.Types.ByteArray
+}
+
+func (d *DeltaByteArrayDecoder) Allocator() memory.Allocator { return d.mem }
+
+// SetData expects the data passed in to be the prefix lengths, followed by the
+// blocks of suffix data in order to initialize the decoder.
+func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error {
+       prefixLenDec := DeltaBitPackInt32Decoder{
+               deltaBitPackDecoder: &deltaBitPackDecoder{
+                       decoder: newDecoderBase(d.encoding, d.descr),
+                       mem:     d.mem}}
+
+       if err := prefixLenDec.SetData(nvalues, data); err != nil {
+               return err
+       }
+
+       d.prefixLengths = make([]int32, nvalues)
+       // decode all the prefix lengths first so we know how many bytes it 
took to get the
+       // prefix lengths for nvalues
+       prefixLenDec.Decode(d.prefixLengths)
+
+       // now that we know how many bytes we needed for the prefix lengths, 
the rest are the
+       // delta length byte array encoding.
+       return d.DeltaLengthByteArrayDecoder.SetData(nvalues, 
data[int(prefixLenDec.bytesRead()):])
+}
+
+// Decode decodes byte arrays into the slice provided and returns the number 
of values actually decoded
+func (d *DeltaByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
+       max := utils.MinInt(len(out), d.nvals)
+       if max == 0 {
+               return 0, nil
+       }
+       out = out[:max]
+
+       var err error
+       if d.lastVal == nil {
+               _, err = d.DeltaLengthByteArrayDecoder.Decode(out[:1])
+               if err != nil {
+                       return 0, err
+               }
+               d.lastVal = out[0]
+               out = out[1:]
+               d.prefixLengths = d.prefixLengths[1:]
+       }
+
+       var prefixLen int32
+       suffixHolder := make([]parquet.ByteArray, 1)
+       for len(out) > 0 {
+               prefixLen, d.prefixLengths = d.prefixLengths[0], 
d.prefixLengths[1:]
+
+               prefix := d.lastVal[:prefixLen]
+               _, err = d.DeltaLengthByteArrayDecoder.Decode(suffixHolder)
+               if err != nil {
+                       return 0, err
+               }
+
+               d.lastVal = make([]byte, 0, int(prefixLen)+len(suffixHolder[0]))

Review comment:
       updated.

##########
File path: go/parquet/internal/encoding/delta_length_byte_array.go
##########
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package encoding
+
+import (
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "golang.org/x/xerrors"
+)
+
+// DeltaLengthByteArrayEncoder encodes data using by taking all of the byte 
array lengths
+// and encoding them in front using delta encoding, followed by all of the 
binary data
+// concatenated back to back. The expected savings is from the cost of 
encoding the lengths
+// and possibly better compression in the data which will no longer be 
interleaved with the lengths.
+//
+// This encoding is always preferred over PLAIN for byte array columns where 
possible.
+//
+// For example, if the data was "Hello", "World", "Foobar", "ABCDEF" the 
encoded data would be:
+// DeltaEncoding(5, 5, 6, 6) "HelloWorldFoobarABCDEF"
+type DeltaLengthByteArrayEncoder struct {
+       encoder
+
+       lengthEncoder *DeltaBitPackInt32Encoder
+}
+
+// Put writes the provided slice of byte arrays to the encoder
+func (enc *DeltaLengthByteArrayEncoder) Put(in []parquet.ByteArray) {
+       lengths := make([]int32, len(in))
+       totalLen := int(0)
+       for idx, val := range in {
+               lengths[idx] = int32(val.Len())
+               totalLen += val.Len()
+       }
+
+       enc.lengthEncoder.Put(lengths)
+       enc.sink.Reserve(totalLen)
+       for _, val := range in {
+               enc.sink.UnsafeWrite(val)
+       }
+}
+
+// PutSpaced is like Put, but the data is spaced out according to the bitmap 
provided and is compressed
+// accordingly before it is written to drop the null data from the write.
+func (enc *DeltaLengthByteArrayEncoder) PutSpaced(in []parquet.ByteArray, 
validBits []byte, validBitsOffset int64) {
+       if validBits != nil {
+               data := make([]parquet.ByteArray, len(in))
+               nvalid := spacedCompress(in, data, validBits, validBitsOffset)
+               enc.Put(data[:nvalid])
+       } else {
+               enc.Put(in)
+       }
+}
+
+// Type returns the underlying type which is handled by this encoder, 
ByteArrays only.
+func (DeltaLengthByteArrayEncoder) Type() parquet.Type {
+       return parquet.Types.ByteArray
+}
+
+// FlushValues flushes any remaining data and returns the final encoded buffer 
of data.
+func (enc *DeltaLengthByteArrayEncoder) FlushValues() Buffer {
+       ret := enc.lengthEncoder.FlushValues()
+       defer ret.Release()
+
+       data := enc.sink.Finish()
+       defer data.Release()
+
+       output := bufferPool.Get().(*memory.Buffer)
+       output.ResizeNoShrink(ret.Len() + data.Len())
+       copy(output.Bytes(), ret.Bytes())
+       copy(output.Bytes()[ret.Len():], data.Bytes())
+       return poolBuffer{output}
+}
+
+// DeltaLengthByteArrayDecoder is a decoder for handling data produced by the 
corresponding
+// encoder which expects delta packed lengths followed by the bytes of data.
+type DeltaLengthByteArrayDecoder struct {
+       decoder
+
+       mem     memory.Allocator
+       lengths []int32
+}
+
+// Type returns the underlying type which is handled by this encoder, 
ByteArrays only.
+func (DeltaLengthByteArrayDecoder) Type() parquet.Type {
+       return parquet.Types.ByteArray
+}
+
+func (d *DeltaLengthByteArrayDecoder) Allocator() memory.Allocator { return 
d.mem }
+
+// SetData sets in the expected data to the decoder which should be nvalues 
delta packed lengths
+// followed by the rest of the byte array data immediately after.
+func (d *DeltaLengthByteArrayDecoder) SetData(nvalues int, data []byte) error {
+       dec := DeltaBitPackInt32Decoder{
+               deltaBitPackDecoder: &deltaBitPackDecoder{
+                       decoder: newDecoderBase(d.encoding, d.descr),
+                       mem:     d.mem}}
+
+       if err := dec.SetData(nvalues, data); err != nil {
+               return err
+       }
+       d.lengths = make([]int32, nvalues)
+       dec.Decode(d.lengths)
+
+       return d.decoder.SetData(nvalues, data[int(dec.bytesRead()):])
+}
+
+// Decode populates the passed in slice with data decoded until it hits the 
length of out
+// or runs out of values in the column to decode, then returns the number of 
values actually decoded.
+func (d *DeltaLengthByteArrayDecoder) Decode(out []parquet.ByteArray) (int, 
error) {
+       max := utils.MinInt(len(out), d.nvals)
+       for i := 0; i < max; i++ {
+               out[i] = d.data[:d.lengths[i]]

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to