This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f17879c  Avro: Add row position reader (#1222)
f17879c is described below

commit f17879c426c2e3c5fa40f17dfe58633bc866f1c9
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Jul 23 14:18:50 2020 -0700

    Avro: Add row position reader (#1222)
---
 .../main/java/org/apache/iceberg/avro/AvroIO.java  |  68 ++++++++
 .../java/org/apache/iceberg/avro/AvroIterable.java |   6 +
 .../apache/iceberg/avro/BuildAvroProjection.java   |   6 +-
 .../apache/iceberg/avro/ProjectionDatumReader.java |  10 +-
 .../apache/iceberg/avro/SupportsRowPosition.java   |  29 ++++
 .../java/org/apache/iceberg/avro/ValueReaders.java |  42 ++++-
 .../org/apache/iceberg/data/avro/DataReader.java   |  11 +-
 .../org/apache/iceberg/avro/TestAvroFileSplit.java | 187 +++++++++++++++++++++
 8 files changed, 355 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
index 523ee6d..c569d93 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
@@ -19,15 +19,29 @@
 
 package org.apache.iceberg.avro;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.avro.InvalidAvroMagicException;
 import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
 import org.apache.iceberg.common.DynClasses;
 import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.DelegatingInputStream;
 import org.apache.iceberg.io.SeekableInputStream;
 
 class AvroIO {
+  private static final byte[] AVRO_MAGIC = new byte[] { 'O', 'b', 'j', 1 };
+  private static final ValueReader<byte[]> MAGIC_READER = 
ValueReaders.fixed(AVRO_MAGIC.length);
+  private static final ValueReader<Map<String, String>> META_READER = 
ValueReaders.map(
+      ValueReaders.strings(), ValueReaders.strings());
+  private static final ValueReader<byte[]> SYNC_READER = 
ValueReaders.fixed(16);
+
   private AvroIO() {
   }
 
@@ -131,4 +145,58 @@ class AvroIO {
       return stream.markSupported();
     }
   }
+
+  static long findStartingRowPos(Supplier<SeekableInputStream> open, long 
start) {
+    long totalRows = 0;
+    try (SeekableInputStream in = open.get()) {
+      // use a direct decoder that will not buffer so the position of the 
input stream is accurate
+      BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, 
null);
+
+      // an Avro file's layout looks like this:
+      //   header|block|block|...
+      // the header contains:
+      //   magic|string-map|sync
+      // each block consists of:
+      //   row-count|compressed-size-in-bytes|block-bytes|sync
+
+      // it is necessary to read the header here because this is the only way 
to get the expected file sync bytes
+      byte[] magic = MAGIC_READER.read(decoder, null);
+      if (!Arrays.equals(AVRO_MAGIC, magic)) {
+        throw new InvalidAvroMagicException("Not an Avro file");
+      }
+
+      META_READER.read(decoder, null); // ignore the file metadata, it isn't 
needed
+      byte[] fileSync = SYNC_READER.read(decoder, null);
+
+      // the while loop reads row counts and seeks past the block bytes until 
the next sync pos is >= start, which
+      // indicates that the next sync is the start of the split.
+      byte[] blockSync = new byte[16];
+      long nextSyncPos = in.getPos();
+
+      while (nextSyncPos < start) {
+        if (nextSyncPos != in.getPos()) {
+          in.seek(nextSyncPos);
+          SYNC_READER.read(decoder, blockSync);
+
+          if (!Arrays.equals(fileSync, blockSync)) {
+            throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);
+          }
+        }
+
+        long rowCount = decoder.readLong();
+        long compressedBlockSize = decoder.readLong();
+
+        totalRows += rowCount;
+        nextSyncPos = in.getPos() + compressedBlockSize;
+      }
+
+      return totalRows;
+
+    } catch (EOFException e) {
+      return totalRows;
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to read stream while finding 
starting row position");
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java 
b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
index b69a950..fd63379 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
@@ -77,7 +77,13 @@ public class AvroIterable<D> extends CloseableGroup 
implements CloseableIterable
     FileReader<D> fileReader = initMetadata(newFileReader());
 
     if (start != null) {
+      if (reader instanceof SupportsRowPosition) {
+        ((SupportsRowPosition) reader).setRowPositionSupplier(
+            () -> AvroIO.findStartingRowPos(file::newStream, start));
+      }
       fileReader = new AvroRangeIterator<>(fileReader, start, end);
+    } else if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(() -> 0L);
     }
 
     addCloseable(fileReader);
diff --git 
a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java 
b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
index a1e0d05..df9e972 100644
--- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
+++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -48,6 +49,7 @@ class BuildAvroProjection extends 
AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Schema record(Schema record, List<String> names, 
Iterable<Schema.Field> schemaIterable) {
     Preconditions.checkArgument(
         current.isNestedType() && current.asNestedType().isStructType(),
@@ -93,7 +95,9 @@ class BuildAvroProjection extends 
AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
         updatedFields.add(avroField);
 
       } else {
-        Preconditions.checkArgument(field.isOptional(), "Missing required 
field: %s", field.name());
+        Preconditions.checkArgument(
+            field.isOptional() || field.fieldId() == 
MetadataColumns.ROW_POSITION.fieldId(),
+            "Missing required field: %s", field.name());
         // Create a field that will be defaulted to null. We assign a unique 
suffix to the field
         // to make sure that even if records in the file have the field it is 
not projected.
         Schema.Field newField = new Schema.Field(
diff --git 
a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java 
b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
index 5f627bb..1ee77cb 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
@@ -30,7 +31,7 @@ import org.apache.iceberg.mapping.MappingUtil;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.types.TypeUtil;
 
-public class ProjectionDatumReader<D> implements DatumReader<D> {
+public class ProjectionDatumReader<D> implements DatumReader<D>, 
SupportsRowPosition {
   private final Function<Schema, DatumReader<?>> getReader;
   private final org.apache.iceberg.Schema expectedSchema;
   private final Map<String, String> renames;
@@ -50,6 +51,13 @@ public class ProjectionDatumReader<D> implements 
DatumReader<D> {
   }
 
   @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (wrapped instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier);
+    }
+  }
+
+  @Override
   public void setSchema(Schema newFileSchema) {
     this.fileSchema = newFileSchema;
     if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
diff --git 
a/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java 
b/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java
new file mode 100644
index 0000000..1113cd4
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.util.function.Supplier;
+
+/**
+ * Interface for readers that accept a callback to determine the starting row 
position of an Avro split.
+ */
+public interface SupportsRowPosition {
+  void setRowPositionSupplier(Supplier<Long> posSupplier);
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java 
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 81c5949..a51580f 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -31,12 +31,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Supplier;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.util.Utf8;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -560,10 +562,11 @@ public class ValueReaders {
     }
   }
 
-  public abstract static class StructReader<S> implements ValueReader<S> {
+  public abstract static class StructReader<S> implements ValueReader<S>, 
SupportsRowPosition {
     private final ValueReader<?>[] readers;
     private final int[] positions;
     private final Object[] constants;
+    private int posField = -1;
 
     protected StructReader(List<ValueReader<?>> readers) {
       this.readers = readers.toArray(new ValueReader[0]);
@@ -582,6 +585,9 @@ public class ValueReaders {
         if (idToConstant.containsKey(field.fieldId())) {
           positionList.add(pos);
           constantList.add(idToConstant.get(field.fieldId()));
+        } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+          // track where the _pos field is located for setRowPositionSupplier
+          this.posField = pos;
         }
       }
 
@@ -589,6 +595,26 @@ public class ValueReaders {
       this.constants = constantList.toArray();
     }
 
+    @Override
+    public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+      if (posField > 0) {
+        long startingPos = posSupplier.get();
+        this.readers[posField] = new PositionReader(startingPos);
+        for (ValueReader<?> reader : readers) {
+          if (reader instanceof SupportsRowPosition) {
+            ((SupportsRowPosition) reader).setRowPositionSupplier(() -> 
startingPos);
+          }
+        }
+
+      } else {
+        for (ValueReader<?> reader : readers) {
+          if (reader instanceof SupportsRowPosition) {
+            ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+          }
+        }
+      }
+    }
+
     protected abstract S reuseOrCreate(Object reuse);
 
     protected abstract Object get(S struct, int pos);
@@ -687,4 +713,18 @@ public class ValueReaders {
       struct.put(pos, value);
     }
   }
+
+  static class PositionReader implements ValueReader<Long> {
+    private long currentPosition;
+
+    PositionReader(long rowPosition) {
+      this.currentPosition = rowPosition - 1;
+    }
+
+    @Override
+    public Long read(Decoder ignored, Object reuse) throws IOException {
+      currentPosition += 1;
+      return currentPosition;
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java 
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index c13bd25..613b4b8 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -32,6 +33,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.ResolvingDecoder;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -40,7 +42,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
-public class DataReader<T> implements DatumReader<T> {
+public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {
 
   private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> 
DECODER_CACHES =
       ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
@@ -78,6 +80,13 @@ public class DataReader<T> implements DatumReader<T> {
     return value;
   }
 
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
   private ResolvingDecoder resolve(Decoder decoder) throws IOException {
     Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
     Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java 
b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
new file mode 100644
index 0000000..135fe9f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroFileSplit {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.required(2, "data", Types.StringType.get()));
+
+  private static final int NUM_RECORDS = 100_000;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public List<Record> expected = null;
+  public InputFile file = null;
+
+  @Before
+  public void writeDataFile() throws IOException {
+    this.expected = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(temp.newFile());
+
+    try (FileAppender<Object> writer = Avro.write(out)
+        .set(TableProperties.AVRO_COMPRESSION, "uncompressed")
+        .createWriterFunc(DataWriter::create)
+        .schema(SCHEMA)
+        .overwrite()
+        .build()) {
+
+      Record record = GenericRecord.create(SCHEMA);
+      for (long i = 0; i < NUM_RECORDS; i += 1) {
+        Record next = record.copy(ImmutableMap.of(
+            "id", i,
+            "data", UUID.randomUUID().toString()));
+        expected.add(next);
+        writer.add(next);
+      }
+    }
+
+    this.file = out.toInputFile();
+  }
+
+  @Test
+  public void testSplitDataSkipping() throws IOException {
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
+    Assert.assertNotEquals("First split should not be empty", 0, 
firstHalf.size());
+
+    List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end - 
splitLocation - 1);
+    Assert.assertNotEquals("Second split should not be empty", 0, 
secondHalf.size());
+
+    Assert.assertEquals("Total records should match expected",
+        expected.size(), firstHalf.size() + secondHalf.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      Assert.assertEquals(expected.get(i), firstHalf.get(i));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      Assert.assertEquals(expected.get(firstHalf.size() + i), 
secondHalf.get(i));
+    }
+  }
+
+  @Test
+  public void testPosField() throws IOException {
+    Schema projection = new Schema(
+        SCHEMA.columns().get(0),
+        MetadataColumns.ROW_POSITION,
+        SCHEMA.columns().get(1));
+
+    List<Record> records = readAvro(file, projection, 0, file.getLength());
+
+    for (int i = 0; i < expected.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) i, 
records.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(i).getField("id"), records.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(i).getField("data"), records.get(i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosFieldWithSplits() throws IOException {
+    Schema projection = new Schema(
+        SCHEMA.columns().get(0),
+        MetadataColumns.ROW_POSITION,
+        SCHEMA.columns().get(1));
+
+    long end = file.getLength();
+    long splitLocation = end / 2;
+
+    List<Record> secondHalf = readAvro(file, projection, splitLocation + 1, 
end - splitLocation - 1);
+    Assert.assertNotEquals("Second split should not be empty", 0, 
secondHalf.size());
+
+    List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
+    Assert.assertNotEquals("First split should not be empty", 0, 
firstHalf.size());
+
+    Assert.assertEquals("Total records should match expected",
+        expected.size(), firstHalf.size() + secondHalf.size());
+
+    for (int i = 0; i < firstHalf.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) i, 
firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(i).getField("id"), firstHalf.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(i).getField("data"), firstHalf.get(i).getField("data"));
+    }
+
+    for (int i = 0; i < secondHalf.size(); i += 1) {
+      Assert.assertEquals("Field _pos should match",
+          (long) (firstHalf.size() + i), 
secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+      Assert.assertEquals("Field id should match",
+          expected.get(firstHalf.size() + i).getField("id"), 
secondHalf.get(i).getField("id"));
+      Assert.assertEquals("Field data should match",
+          expected.get(firstHalf.size() + i).getField("data"), 
secondHalf.get(i).getField("data"));
+    }
+  }
+
+  @Test
+  public void testPosWithEOFSplit() throws IOException {
+    Schema projection = new Schema(
+        SCHEMA.columns().get(0),
+        MetadataColumns.ROW_POSITION,
+        SCHEMA.columns().get(1));
+
+    long end = file.getLength();
+
+    List<Record> records = readAvro(file, projection, end - 10, 10);
+    Assert.assertEquals("Should not read any records", 0, records.size());
+  }
+
+  public List<Record> readAvro(InputFile in, Schema projection, long start, 
long length) throws IOException {
+    try (AvroIterable<Record> reader = Avro.read(in)
+        .createReaderFunc(DataReader::create)
+        .split(start, length)
+        .project(projection)
+        .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+}

Reply via email to