zeroshade commented on code in PR #674:
URL: https://github.com/apache/iceberg-go/pull/674#discussion_r2771166155
##########
table/table.go:
##########
@@ -129,6 +129,29 @@ func (t Table) Append(ctx context.Context, rdr
array.RecordReader, snapshotProps
return txn.Commit(ctx)
}
+// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then
committing the transaction.
+// The batchSize parameter refers to the batch size for reading the input
data, not the batch size for writes.
+// The concurrency parameter controls the level of parallelism. If concurrency
<= 0, defaults to runtime.GOMAXPROCS(0).
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize
int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) (*Table, error) {
Review Comment:
Update the docstring to explain what the filter actually means (i.e. how
does it affect the overwrite), add the same to the function below please
##########
table/transaction.go:
##########
@@ -499,6 +500,324 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
concurrency int, snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Default concurrency if not specified
+ if concurrency <= 0 {
+ concurrency = runtime.GOMAXPROCS(0)
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, concurrency)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, concurrency); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ type classifiedFiles struct {
+ toDelete []iceberg.DataFile
+ toRewrite []iceberg.DataFile
+ }
+
+ var (
+ mu sync.Mutex
+ allFilesToDel []iceberg.DataFile
+ allFilesToRewr []iceberg.DataFile
+ )
+
+ g, _ := errgroup.WithContext(ctx)
+ g.SetLimit(min(concurrency, len(manifests)))
+
+ for _, manifest := range manifests {
+ manifest := manifest // capture loop variable
+ g.Go(func() error {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
manifest %s: %w", manifest.FilePath(), err)
+ }
+ if !match {
+ return nil
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ localDelete := make([]iceberg.DataFile, 0)
+ localRewrite := make([]iceberg.DataFile, 0)
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED
{
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData
{
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ localDelete = append(localDelete, df)
+ } else {
+ localRewrite = append(localRewrite, df)
+ }
+ }
+
+ if len(localDelete) > 0 || len(localRewrite) > 0 {
+ mu.Lock()
+ allFilesToDel = append(allFilesToDel,
localDelete...)
+ allFilesToRewr = append(allFilesToRewr,
localRewrite...)
+ mu.Unlock()
+ }
+
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, nil, err
+ }
+
+ return allFilesToDel, allFilesToRewr, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, concurrency int) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ for _, originalFile := range files {
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID, concurrency)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
+
+ return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+ scanTask := &FileScanTask{
+ File: originalFile,
+ Start: 0,
+ Length: originalFile.FileSizeBytes(),
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: true,
+ rowLimit: -1, // No limit
+ concurrency: concurrency,
+ }
+
+ _, recordIter, err := scanner.GetRecords(ctx, []FileScanTask{*scanTask})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get records from original
file: %w", err)
+ }
+
+ var records []arrow.RecordBatch
+ for record, err := range recordIter {
+ if err != nil {
+ return nil, fmt.Errorf("failed to read record: %w", err)
+ }
+ records = append(records, record)
+ }
+
+ // If no records remain after filtering, don't create any new files.
+ // This case is rare but possible given that the logic uses the file
stats to make a decision
+ if len(records) == 0 {
+ return nil, nil
+ }
+
+ arrowSchema, err := SchemaToArrowSchema(t.meta.CurrentSchema(), nil,
false, false)
Review Comment:
the first return value from `scanner.GetRecords` is the arrow schema, so we
shouldn't need to do this again here, right?
##########
table/transaction.go:
##########
@@ -499,6 +500,324 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
concurrency int, snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, concurrency,
snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting.
+// If concurrency <= 0, defaults to runtime.GOMAXPROCS(0).
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int,
snapshotProps iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Default concurrency if not specified
+ if concurrency <= 0 {
+ concurrency = runtime.GOMAXPROCS(0)
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, concurrency)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, concurrency); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter, concurrency)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, concurrency int) (filesToDelete,
filesToRewrite []iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ type classifiedFiles struct {
+ toDelete []iceberg.DataFile
+ toRewrite []iceberg.DataFile
+ }
+
+ var (
+ mu sync.Mutex
+ allFilesToDel []iceberg.DataFile
+ allFilesToRewr []iceberg.DataFile
+ )
+
+ g, _ := errgroup.WithContext(ctx)
+ g.SetLimit(min(concurrency, len(manifests)))
+
+ for _, manifest := range manifests {
+ manifest := manifest // capture loop variable
+ g.Go(func() error {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
manifest %s: %w", manifest.FilePath(), err)
+ }
+ if !match {
+ return nil
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ localDelete := make([]iceberg.DataFile, 0)
+ localRewrite := make([]iceberg.DataFile, 0)
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED
{
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData
{
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ localDelete = append(localDelete, df)
+ } else {
+ localRewrite = append(localRewrite, df)
+ }
+ }
+
+ if len(localDelete) > 0 || len(localRewrite) > 0 {
+ mu.Lock()
+ allFilesToDel = append(allFilesToDel,
localDelete...)
+ allFilesToRewr = append(allFilesToRewr,
localRewrite...)
+ mu.Unlock()
+ }
+
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, nil, err
+ }
+
+ return allFilesToDel, allFilesToRewr, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, concurrency int) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ for _, originalFile := range files {
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID, concurrency)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
+
+ return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+ scanTask := &FileScanTask{
+ File: originalFile,
+ Start: 0,
+ Length: originalFile.FileSizeBytes(),
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: true,
+ rowLimit: -1, // No limit
+ concurrency: concurrency,
+ }
+
+ _, recordIter, err := scanner.GetRecords(ctx, []FileScanTask{*scanTask})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get records from original
file: %w", err)
+ }
+
+ var records []arrow.RecordBatch
+ for record, err := range recordIter {
+ if err != nil {
+ return nil, fmt.Errorf("failed to read record: %w", err)
+ }
+ records = append(records, record)
+ }
+
+ // If no records remain after filtering, don't create any new files.
+ // This case is rare but possible given that the logic uses the file
stats to make a decision
+ if len(records) == 0 {
+ return nil, nil
+ }
+
+ arrowSchema, err := SchemaToArrowSchema(t.meta.CurrentSchema(), nil,
false, false)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert schema to arrow: %w",
err)
+ }
+ table := array.NewTableFromRecords(arrowSchema, records)
+ defer table.Release()
+
+ rdr := array.NewTableReader(table, table.NumRows())
+ defer rdr.Release()
+
+ var result []iceberg.DataFile
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &commitUUID,
+ })
Review Comment:
you received a record iterator from `scanner.GetRecords`, why create a table
from it, and a reader, instead of just passing in the same iterator that you
got back from `scanner.GetRecords`?
i.e. something like:
```go
sc, iter, err := scanner.GetRecords(ctx, []FileScanTask{*scanTask})
if err != nil { /* .... */ }
dfIter := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
sc: sc,
itr: iter,
fs: fs.(io.WriteFileIO),
writeUUID: &commitUUID,
})
```
In theory if there are no records, you should just get an empty list of data
files at the end, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]