laskoviymishka commented on code in PR #948:
URL: https://github.com/apache/iceberg-go/pull/948#discussion_r3159517840


##########
table/clustered_writer.go:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// clusteredPartitionedWrite writes records to partitioned data files,
+// keeping at most one partition writer open at a time. When the
+// partition changes, the current writer is closed before opening a
+// new one. This is the memory-efficient write path for pre-clustered
+// input (e.g. compaction reads where each source file belongs to a
+// single partition).
+//
+// The input must be strictly clustered by partition: once a partition's
+// writer has been closed, encountering further records for that
+// partition is treated as a violation of the clustering assumption and
+// returns an error. Use the fanout writer if the input is not
+// clustered.
+func clusteredPartitionedWrite(
+       ctx context.Context,
+       spec iceberg.PartitionSpec,
+       schema *iceberg.Schema,
+       factory *writerFactory,
+       records iter.Seq2[arrow.RecordBatch, error],
+) iter.Seq2[iceberg.DataFile, error] {
+       outputCh := make(chan iceberg.DataFile, 1)
+       errCh := make(chan error, 1)
+
+       go func() {
+               defer close(outputCh)
+               defer factory.stopCount()
+
+               var (
+                       currentKey          string
+                       currentWriter       *RollingDataWriter
+                       completedPartitions = make(map[string]struct{})
+               )
+
+               closeCurrentWriter := func() error {
+                       if currentWriter == nil {
+                               return nil
+                       }
+                       w := currentWriter
+                       currentWriter = nil
+                       completedPartitions[currentKey] = struct{}{}
+                       close(w.recordCh)
+                       w.wg.Wait()
+
+                       // stream's deferred close(errorCh) runs before its
+                       // deferred wg.Done, so by the time Wait returns the
+                       // channel is closed; this read never blocks and yields
+                       // either the buffered error or nil.
+                       return <-w.errorCh
+               }
+
+               fail := func(err error) {
+                       errCh <- errors.Join(err, closeCurrentWriter())
+                       close(errCh)
+               }
+
+               takeFn := partitionBatchByKey(ctx)
+
+               for rec, err := range records {
+                       if err != nil {
+                               fail(err)
+
+                               return
+                       }
+
+                       partitions, err := getRecordPartitions(spec, schema, 
rec)
+                       if err != nil {
+                               fail(err)
+
+                               return
+                       }
+
+                       for _, part := range partitions {
+                               select {
+                               case <-ctx.Done():
+                                       fail(context.Cause(ctx))
+
+                                       return
+                               default:
+                               }
+
+                               subBatch, err := takeFn(rec, part.rows)
+                               if err != nil {
+                                       fail(err)
+
+                                       return
+                               }
+
+                               partitionPath := 
spec.PartitionToPath(part.partitionRec, schema)

Review Comment:
   Java's `ClusteredDataWriter` keys its revisit check on the `StructLike` 
partition tuple via `StructLikeWrapper`. 
   
   Here `currentKey`/`completedPartitions` use the human-readable path from 
`PartitionToPath`, which goes through `Transform.ToHumanStr` — and 
identity-string renders SQL `NULL` and the literal string `"null"` both as 
`partition_col=null`. So a clustered stream with `null → "null" → null` 
silently passes the revisit check on the second `null` and corrupts the second 
writer's data. 
   
   Could we key on a structural hash of `partitionRec` (the same key 
`newPartitionMapNode` already builds) and keep the path purely for the on-disk 
location?
   



##########
table/write_records.go:
##########
@@ -52,6 +54,37 @@ func WithWriteUUID(id uuid.UUID) WriteRecordOption {
        }
 }
 
+// WithMaxWriteWorkers overrides the default number of fanout workers
+// used for partitioned writes. Each worker processes record batches,
+// partitions them, and writes to the appropriate partition files.
+// Fewer workers means fewer concurrent parquet writers compressing
+// pages simultaneously, which reduces peak memory. A value of 0
+// (the default) uses [config.EnvConfig.MaxWorkers].
+//
+// This option is ignored when [WithClusteredWrite] is set.
+func WithMaxWriteWorkers(n int) WriteRecordOption {

Review Comment:
   The doc on `WithMaxWriteWorkers` reads as if it controls write concurrency, 
but `args.clustered` short-circuits in `recordsToDataFiles` before 
`maxWriteWorkers` is read, and a caller only learns about the interaction by 
also reading `WithClusteredWrite`'s docstring. 
   
   Since the clustered writer is single-threaded by design, the two options are 
genuinely incompatible rather than just-overridden. 
   
   Would we rather return an error from `WriteRecords` when both are set 
instead of dropping one silently?
   



##########
table/write_records.go:
##########
@@ -52,6 +54,37 @@ func WithWriteUUID(id uuid.UUID) WriteRecordOption {
        }
 }
 
+// WithMaxWriteWorkers overrides the default number of fanout workers
+// used for partitioned writes. Each worker processes record batches,
+// partitions them, and writes to the appropriate partition files.
+// Fewer workers means fewer concurrent parquet writers compressing
+// pages simultaneously, which reduces peak memory. A value of 0
+// (the default) uses [config.EnvConfig.MaxWorkers].
+//
+// This option is ignored when [WithClusteredWrite] is set.
+func WithMaxWriteWorkers(n int) WriteRecordOption {
+       return func(c *writeRecordConfig) {
+               c.maxWriteWorkers = n
+       }
+}
+
+// WithClusteredWrite enables the memory-efficient clustered write
+// path for partitioned tables. It keeps at most one partition writer
+// open at a time: when a record arrives for a new partition, the
+// current writer is flushed and closed before a new one is opened.
+//
+// The input must be strictly clustered by partition: once a
+// partition's writer has been closed, encountering further records
+// for that partition returns an error. This is the natural order for
+// compaction, where each source data file typically belongs to a
+// single partition. If the input is not clustered, use the fanout
+// writer (the default) instead.
+func WithClusteredWrite() WriteRecordOption {

Review Comment:
   The doc reads row-level — "once a partition's writer has been closed, 
encountering further records for that partition returns an error" — which 
sounds like Java's `ClusteredWriter`. 
   
   In practice the writer sorts `collectPartitions()` by first-row index and 
writes each partition's rows contiguously, so a single batch interleaved like 
`[a,b,a,b]` is silently reclustered into `[a,a,b,b]` and accepted; the strict 
check only fires across batches. 
   
   Useful relaxation for the RewriteDataFiles caller, but it diverges from what 
the docstring says — worth either tightening the doc to "clustered across 
batches; within-batch reordering is tolerated", or pinning the interleave 
behavior with a test so the contract is unambiguous?



##########
table/clustered_writer.go:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// clusteredPartitionedWrite writes records to partitioned data files,
+// keeping at most one partition writer open at a time. When the
+// partition changes, the current writer is closed before opening a
+// new one. This is the memory-efficient write path for pre-clustered
+// input (e.g. compaction reads where each source file belongs to a
+// single partition).
+//
+// The input must be strictly clustered by partition: once a partition's
+// writer has been closed, encountering further records for that
+// partition is treated as a violation of the clustering assumption and
+// returns an error. Use the fanout writer if the input is not
+// clustered.
+func clusteredPartitionedWrite(
+       ctx context.Context,
+       spec iceberg.PartitionSpec,
+       schema *iceberg.Schema,
+       factory *writerFactory,
+       records iter.Seq2[arrow.RecordBatch, error],
+) iter.Seq2[iceberg.DataFile, error] {
+       outputCh := make(chan iceberg.DataFile, 1)
+       errCh := make(chan error, 1)
+
+       go func() {
+               defer close(outputCh)
+               defer factory.stopCount()
+
+               var (
+                       currentKey          string
+                       currentWriter       *RollingDataWriter
+                       completedPartitions = make(map[string]struct{})
+               )
+
+               closeCurrentWriter := func() error {
+                       if currentWriter == nil {
+                               return nil
+                       }
+                       w := currentWriter
+                       currentWriter = nil
+                       completedPartitions[currentKey] = struct{}{}
+                       close(w.recordCh)
+                       w.wg.Wait()
+
+                       // stream's deferred close(errorCh) runs before its
+                       // deferred wg.Done, so by the time Wait returns the
+                       // channel is closed; this read never blocks and yields
+                       // either the buffered error or nil.
+                       return <-w.errorCh
+               }
+
+               fail := func(err error) {
+                       errCh <- errors.Join(err, closeCurrentWriter())
+                       close(errCh)

Review Comment:
   2 related issues. 
   
   - First, `errCh` is closed inline on every path, never via `defer`, so a 
panic anywhere in the body (`getRecordPartitions`, `partitionBatchByKey`, an 
Arrow op inside `Add`, the `records` iter itself) closes `outputCh` but leaves 
`errCh` unclosed and unwritten — the consumer drains `outputCh` then blocks on 
`<-errCh` forever. 
   
   - Second, the producer only checks `ctx.Done()` inside the inner partition 
loop, so if the caller's `for ... range` returns early, nothing tears the 
producer down: it keeps writing files into a discarded channel and any 
post-break error gets buffered into a channel nobody reads, so a failed write 
looks successful from the caller's seat. 
    
   The cleaner shape is probably a derived ctx that the iterator cancels in a 
`defer` when `yield` returns false (or the caller breaks), so the producer 
notices and exits.
   I would say worth to cover both in this PR.
   Plus need a test that verify this pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to