Repository: beam Updated Branches: refs/heads/master f7d4583bd -> 80b9cf9c2
Moves Match into FileIO.match()/matchAll() FileIO will later gain other methods, such as read()/write(). Also introduces FileIO.MatchConfiguration - a common type to use by various file-based IOs to reduce boilerplate, and uses it in TextIO. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58e8a01b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58e8a01b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58e8a01b Branch: refs/heads/master Commit: 58e8a01b351a80ca233983c8ccfd4b2699c86a3a Parents: f7d4583 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 24 16:31:41 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Sun Sep 3 16:14:54 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 232 +++++++--------- .../java/org/apache/beam/sdk/io/FileIO.java | 265 +++++++++++++++++++ .../main/java/org/apache/beam/sdk/io/Match.java | 156 ----------- .../beam/sdk/io/ReadAllViaFileBasedSource.java | 2 +- .../java/org/apache/beam/sdk/io/TextIO.java | 147 ++++------ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 4 +- .../org/apache/beam/sdk/io/TextIOReadTest.java | 5 +- 7 files changed, 418 insertions(+), 393 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 6eeeac9..c4711e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -40,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; +import org.apache.beam.sdk.io.FileIO.MatchConfiguration; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; @@ -266,7 +266,7 @@ public class AvroIO { */ public static <T> Read<T> read(Class<T> recordClass) { return new AutoValue_AvroIO_Read.Builder<T>() - .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .setRecordClass(recordClass) .setSchema(ReflectData.get().getSchema(recordClass)) .setHintMatchesManyFiles(false) @@ -276,7 +276,7 @@ public class AvroIO { /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */ public static <T> ReadAll<T> readAll(Class<T> recordClass) { return new AutoValue_AvroIO_ReadAll.Builder<T>() - .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .setRecordClass(recordClass) .setSchema(ReflectData.get().getSchema(recordClass)) // 64MB is a reasonable value that allows to amortize the cost of opening files, @@ -289,7 +289,7 @@ public class AvroIO { /** Reads Avro file(s) containing records of the specified schema. */ public static Read<GenericRecord> readGenericRecords(Schema schema) { return new AutoValue_AvroIO_Read.Builder<GenericRecord>() - .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .setRecordClass(GenericRecord.class) .setSchema(schema) .setHintMatchesManyFiles(false) @@ -302,7 +302,7 @@ public class AvroIO { */ public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) { return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>() - .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .setRecordClass(GenericRecord.class) .setSchema(schema) .setDesiredBundleSizeBytes(64 * 1024 * 1024L) @@ -331,7 +331,7 @@ public class AvroIO { */ public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) { return new AutoValue_AvroIO_Parse.Builder<T>() - .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .setParseFn(parseFn) .setHintMatchesManyFiles(false) .build(); @@ -344,7 +344,7 @@ public class AvroIO { public static <T> ParseAll<T> parseAllGenericRecords( SerializableFunction<GenericRecord, T> parseFn) { return new AutoValue_AvroIO_ParseAll.Builder<T>() - .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .setParseFn(parseFn) .setDesiredBundleSizeBytes(64 * 1024 * 1024L) .build(); @@ -425,9 +425,7 @@ public class AvroIO { @AutoValue public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { @Nullable abstract ValueProvider<String> getFilepattern(); - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable abstract Duration getWatchForNewFilesInterval(); - @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition(); + abstract MatchConfiguration getMatchConfiguration(); @Nullable abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); abstract boolean getHintMatchesManyFiles(); @@ -437,10 +435,7 @@ public class AvroIO { @AutoValue.Builder abstract static class Builder<T> { abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); - abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); - abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder<T> setWatchForNewFilesTerminationCondition( - TerminationCondition<?, ?> condition); + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder<T> setRecordClass(Class<T> recordClass); abstract Builder<T> setSchema(Schema schema); abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles); @@ -463,11 +458,15 @@ public class AvroIO { return from(StaticValueProvider.of(filepattern)); } - /** - * Configures whether or not a filepattern matching no files is allowed. - */ + + /** Sets the {@link MatchConfiguration}. */ + public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) { + return toBuilder().setMatchConfiguration(matchConfiguration).build(); + } + + /** Configures whether or not a filepattern matching no files is allowed. */ public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** @@ -476,16 +475,12 @@ public class AvroIO { * is unbounded. * * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}. - * - * @see TerminationCondition */ @Experimental(Kind.SPLITTABLE_DO_FN) public Read<T> watchForNewFiles( - Duration pollInterval, TerminationCondition<?, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } /** @@ -506,12 +501,15 @@ public class AvroIO { checkNotNull(getFilepattern(), "filepattern"); checkNotNull(getSchema(), "schema"); - if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) { + if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { return input.apply( "Read", org.apache.beam.sdk.io.Read.from( createSource( - getFilepattern(), getEmptyMatchTreatment(), getRecordClass(), getSchema()))); + getFilepattern(), + getMatchConfiguration().getEmptyMatchTreatment(), + getRecordClass(), + getSchema()))); } // All other cases go through ReadAll. @@ -519,12 +517,7 @@ public class AvroIO { (getRecordClass() == GenericRecord.class) ? (ReadAll<T>) readAllGenericRecords(getSchema()) : readAll(getRecordClass()); - readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment()); - if (getWatchForNewFilesInterval() != null) { - TerminationCondition<String, ?> readAllCondition = - ignoreInput(getWatchForNewFilesTerminationCondition()); - readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition); - } + readAll = readAll.withMatchConfiguration(getMatchConfiguration()); return input .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) .apply("Via ReadAll", readAll); @@ -536,12 +529,7 @@ public class AvroIO { builder .addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) - .add( - DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) - .withLabel("Treatment of filepatterns that match no files")) - .addIfNotNull( - DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")); + .include("matchConfiguration", getMatchConfiguration()); } @SuppressWarnings("unchecked") @@ -563,9 +551,7 @@ public class AvroIO { /** Implementation of {@link #readAll}. */ @AutoValue public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> { - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable abstract Duration getWatchForNewFilesInterval(); - @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition(); + abstract MatchConfiguration getMatchConfiguration(); @Nullable abstract Class<T> getRecordClass(); @Nullable abstract Schema getSchema(); abstract long getDesiredBundleSizeBytes(); @@ -574,10 +560,7 @@ public class AvroIO { @AutoValue.Builder abstract static class Builder<T> { - abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); - abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder<T> setWatchForNewFilesTerminationCondition( - TerminationCondition<String, ?> condition); + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder<T> setRecordClass(Class<T> recordClass); abstract Builder<T> setSchema(Schema schema); abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); @@ -585,19 +568,23 @@ public class AvroIO { abstract ReadAll<T> build(); } + + /** Sets the {@link MatchConfiguration}. */ + public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + /** Like {@link Read#withEmptyMatchTreatment}. */ public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** Like {@link Read#watchForNewFiles}. */ @Experimental(Kind.SPLITTABLE_DO_FN) public ReadAll<T> watchForNewFiles( Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } @VisibleForTesting @@ -608,48 +595,30 @@ public class AvroIO { @Override public PCollection<T> expand(PCollection<String> input) { checkNotNull(getSchema(), "schema"); - Match.Filepatterns matchFilepatterns = - Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment()); - if (getWatchForNewFilesInterval() != null) { - matchFilepatterns = - matchFilepatterns.continuously( - getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); - } - return input - .apply(matchFilepatterns) + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */, getDesiredBundleSizeBytes(), - new CreateSourceFn<>( - getEmptyMatchTreatment(), getRecordClass(), getSchema().toString()))) + new CreateSourceFn<>(getRecordClass(), getSchema().toString()))) .setCoder(AvroCoder.of(getRecordClass(), getSchema())); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .add( - DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) - .withLabel("Treatment of filepatterns that match no files")) - .addIfNotNull( - DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")); + builder.include("matchConfiguration", getMatchConfiguration()); } } private static class CreateSourceFn<T> implements SerializableFunction<String, FileBasedSource<T>> { - private final EmptyMatchTreatment emptyMatchTreatment; private final Class<T> recordClass; private final Supplier<Schema> schemaSupplier; - public CreateSourceFn( - EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String jsonSchema) { - this.emptyMatchTreatment = emptyMatchTreatment; + public CreateSourceFn(Class<T> recordClass, String jsonSchema) { this.recordClass = recordClass; this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema); } @@ -657,7 +626,10 @@ public class AvroIO { @Override public FileBasedSource<T> apply(String input) { return Read.createSource( - StaticValueProvider.of(input), emptyMatchTreatment, recordClass, schemaSupplier.get()); + StaticValueProvider.of(input), + EmptyMatchTreatment.DISALLOW, + recordClass, + schemaSupplier.get()); } } @@ -667,9 +639,7 @@ public class AvroIO { @AutoValue public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> { @Nullable abstract ValueProvider<String> getFilepattern(); - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable abstract Duration getWatchForNewFilesInterval(); - @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition(); + abstract MatchConfiguration getMatchConfiguration(); abstract SerializableFunction<GenericRecord, T> getParseFn(); @Nullable abstract Coder<T> getCoder(); abstract boolean getHintMatchesManyFiles(); @@ -679,10 +649,7 @@ public class AvroIO { @AutoValue.Builder abstract static class Builder<T> { abstract Builder<T> setFilepattern(ValueProvider<String> filepattern); - abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); - abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder<T> setWatchForNewFilesTerminationCondition( - TerminationCondition<?, ?> condition); + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); abstract Builder<T> setCoder(Coder<T> coder); abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles); @@ -700,19 +667,22 @@ public class AvroIO { return toBuilder().setFilepattern(filepattern).build(); } + /** Sets the {@link MatchConfiguration}. */ + public Parse<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + /** Like {@link Read#withEmptyMatchTreatment}. */ public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** Like {@link Read#watchForNewFiles}. */ @Experimental(Kind.SPLITTABLE_DO_FN) public Parse<T> watchForNewFiles( - Duration pollInterval, TerminationCondition<?, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } /** Sets a coder for the result of the parse function. */ @@ -730,24 +700,19 @@ public class AvroIO { checkNotNull(getFilepattern(), "filepattern"); Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); - if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) { + if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { return input.apply( org.apache.beam.sdk.io.Read.from( AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder))); } // All other cases go through ParseAllGenericRecords. - ParseAll<T> parseAll = - parseAllGenericRecords(getParseFn()) - .withCoder(coder) - .withEmptyMatchTreatment(getEmptyMatchTreatment()); - if (getWatchForNewFilesInterval() != null) { - TerminationCondition<String, ?> parseAllCondition = - ignoreInput(getWatchForNewFilesTerminationCondition()); - parseAll = parseAll.watchForNewFiles(getWatchForNewFilesInterval(), parseAllCondition); - } return input .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) - .apply("Via ParseAll", parseAll); + .apply( + "Via ParseAll", + parseAllGenericRecords(getParseFn()) + .withCoder(coder) + .withMatchConfiguration(getMatchConfiguration())); } private static <T> Coder<T> inferCoder( @@ -776,12 +741,7 @@ public class AvroIO { .addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern")) .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) - .add( - DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) - .withLabel("Treatment of filepatterns that match no files")) - .addIfNotNull( - DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")); + .include("matchConfiguration", getMatchConfiguration()); } } @@ -790,9 +750,7 @@ public class AvroIO { /** Implementation of {@link #parseAllGenericRecords}. */ @AutoValue public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> { - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable abstract Duration getWatchForNewFilesInterval(); - @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition(); + abstract MatchConfiguration getMatchConfiguration(); abstract SerializableFunction<GenericRecord, T> getParseFn(); @Nullable abstract Coder<T> getCoder(); abstract long getDesiredBundleSizeBytes(); @@ -801,10 +759,7 @@ public class AvroIO { @AutoValue.Builder abstract static class Builder<T> { - abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); - abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder<T> setWatchForNewFilesTerminationCondition( - TerminationCondition<String, ?> condition); + abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn); abstract Builder<T> setCoder(Coder<T> coder); abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes); @@ -812,19 +767,22 @@ public class AvroIO { abstract ParseAll<T> build(); } + /** Sets the {@link MatchConfiguration}. */ + public ParseAll<T> withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + /** Like {@link Read#withEmptyMatchTreatment}. */ public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** Like {@link Read#watchForNewFiles}. */ @Experimental(Kind.SPLITTABLE_DO_FN) public ParseAll<T> watchForNewFiles( Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } /** Specifies the coder for the result of the {@code parseFn}. */ @@ -842,25 +800,10 @@ public class AvroIO { final Coder<T> coder = Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry()); final SerializableFunction<GenericRecord, T> parseFn = getParseFn(); - final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment(); final SerializableFunction<String, FileBasedSource<T>> createSource = - new SerializableFunction<String, FileBasedSource<T>>() { - @Override - public FileBasedSource<T> apply(String input) { - return AvroSource.from(input) - .withParseFn(parseFn, coder) - .withEmptyMatchTreatment(emptyMatchTreatment); - } - }; - Match.Filepatterns matchFilepatterns = - Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment); - if (getWatchForNewFilesInterval() != null) { - matchFilepatterns = - matchFilepatterns.continuously( - getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); - } + new CreateParseSourceFn<>(parseFn, coder); return input - .apply(matchFilepatterns) + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) .apply( "Parse all via FileBasedSource", new ReadAllViaFileBasedSource<>( @@ -875,12 +818,23 @@ public class AvroIO { super.populateDisplayData(builder); builder .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function")) - .add( - DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) - .withLabel("Treatment of filepatterns that match no files")) - .addIfNotNull( - DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")); + .include("matchConfiguration", getMatchConfiguration()); + } + + private static class CreateParseSourceFn<T> + implements SerializableFunction<String, FileBasedSource<T>> { + private final SerializableFunction<GenericRecord, T> parseFn; + private final Coder<T> coder; + + public CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, Coder<T> coder) { + this.parseFn = parseFn; + this.coder = coder; + } + + @Override + public FileBasedSource<T> apply(String input) { + return AvroSource.from(input).withParseFn(parseFn, coder); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java new file mode 100644 index 0000000..1eb81df --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transforms for working with files. Currently includes matching of filepatterns via {@link #match} + * and {@link #matchAll}. + */ +public class FileIO { + private static final Logger LOG = LoggerFactory.getLogger(FileIO.class); + + /** + * Matches a filepattern using {@link FileSystems#match} and produces a collection of matched + * resources (both files and directories) as {@link MatchResult.Metadata}. + * + * <p>By default, matches the filepattern once and produces a bounded {@link PCollection}. To + * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration, + * TerminationCondition)} - this will produce an unbounded {@link PCollection}. + * + * <p>By default, a filepattern matching no resources is treated according to {@link + * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link + * Match#withEmptyMatchTreatment}. + */ + public static Match match() { + return new AutoValue_FileIO_Match.Builder() + .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .build(); + } + + /** + * Like {@link #match}, but matches each filepattern in a collection of filepatterns. + * + * <p>Resources are not deduplicated between filepatterns, i.e. if the same resource matches + * multiple filepatterns, it will be produced multiple times. + * + * <p>By default, a filepattern matching no resources is treated according to {@link + * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link + * MatchAll#withEmptyMatchTreatment}. + */ + public static MatchAll matchAll() { + return new AutoValue_FileIO_MatchAll.Builder() + .setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .build(); + } + + /** + * Describes configuration for matching filepatterns, such as {@link EmptyMatchTreatment} + * and continuous watching for matching files. + */ + @AutoValue + public abstract static class MatchConfiguration implements HasDisplayData, Serializable { + /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */ + public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) { + return new AutoValue_FileIO_MatchConfiguration.Builder() + .setEmptyMatchTreatment(emptyMatchTreatment) + .build(); + } + + abstract EmptyMatchTreatment getEmptyMatchTreatment(); + @Nullable abstract Duration getWatchInterval(); + @Nullable abstract TerminationCondition<String, ?> getWatchTerminationCondition(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); + abstract Builder setWatchInterval(Duration watchInterval); + abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition); + abstract MatchConfiguration build(); + } + + /** Sets the {@link EmptyMatchTreatment}. */ + public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return toBuilder().setEmptyMatchTreatment(treatment).build(); + } + + /** + * Continuously watches for new files at the given interval until the given termination + * condition is reached, where the input to the condition is the filepattern. + */ + public MatchConfiguration continuously( + Duration interval, TerminationCondition<String, ?> condition) { + return toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add( + DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) + .withLabel("Treatment of filepatterns that match no files")) + .addIfNotNull( + DisplayData.item("watchForNewFilesInterval", getWatchInterval()) + .withLabel("Interval to watch for new files")); + } + } + + /** Implementation of {@link #match}. */ + @AutoValue + public abstract static class Match extends PTransform<PBegin, PCollection<MatchResult.Metadata>> { + abstract ValueProvider<String> getFilepattern(); + abstract MatchConfiguration getConfiguration(); + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilepattern(ValueProvider<String> filepattern); + abstract Builder setConfiguration(MatchConfiguration configuration); + abstract Match build(); + } + + /** Matches the given filepattern. */ + public Match filepattern(String filepattern) { + return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern)); + } + + /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ + public Match filepattern(ValueProvider<String> filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + /** Sets the {@link MatchConfiguration}. */ + public Match withConfiguration(MatchConfiguration configuration) { + return toBuilder().setConfiguration(configuration).build(); + } + + /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */ + public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** + * See {@link MatchConfiguration#continuously}. The returned {@link PCollection} is unbounded. + * + * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}. + */ + @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) + public Match continuously( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition)); + } + + @Override + public PCollection<MatchResult.Metadata> expand(PBegin input) { + return input + .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply("Via MatchAll", matchAll().withConfiguration(getConfiguration())); + } + } + + /** Implementation of {@link #matchAll}. */ + @AutoValue + public abstract static class MatchAll + extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>> { + abstract MatchConfiguration getConfiguration(); + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConfiguration(MatchConfiguration configuration); + abstract MatchAll build(); + } + + /** Like {@link Match#withConfiguration}. */ + public MatchAll withConfiguration(MatchConfiguration configuration) { + return toBuilder().setConfiguration(configuration).build(); + } + + /** Like {@link Match#withEmptyMatchTreatment}. */ + public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** Like {@link Match#continuously}. */ + @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) + public MatchAll continuously( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withConfiguration(getConfiguration().continuously(pollInterval, terminationCondition)); + } + + @Override + public PCollection<MatchResult.Metadata> expand(PCollection<String> input) { + if (getConfiguration().getWatchInterval() == null) { + return input.apply( + "Match filepatterns", + ParDo.of(new MatchFn(getConfiguration().getEmptyMatchTreatment()))); + } else { + return input + .apply( + "Continuously match filepatterns", + Watch.growthOf(new MatchPollFn()) + .withPollInterval(getConfiguration().getWatchInterval()) + .withTerminationPerInput(getConfiguration().getWatchTerminationCondition())) + .apply(Values.<MatchResult.Metadata>create()); + } + } + + private static class MatchFn extends DoFn<String, MatchResult.Metadata> { + private final EmptyMatchTreatment emptyMatchTreatment; + + public MatchFn(EmptyMatchTreatment emptyMatchTreatment) { + this.emptyMatchTreatment = emptyMatchTreatment; + } + + @ProcessElement + public void process(ProcessContext c) throws Exception { + String filepattern = c.element(); + MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment); + LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern); + for (MatchResult.Metadata metadata : match.metadata()) { + c.output(metadata); + } + } + } + + private static class MatchPollFn implements Watch.Growth.PollFn<String, MatchResult.Metadata> { + @Override + public Watch.Growth.PollResult<MatchResult.Metadata> apply(String input, Instant timestamp) + throws Exception { + return Watch.Growth.PollResult.incomplete( + Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java deleted file mode 100644 index bb44fac..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import com.google.auto.value.AutoValue; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.Watch; -import org.apache.beam.sdk.transforms.Watch.Growth.PollResult; -import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Matches each filepattern in a collection of filepatterns using {@link FileSystems#match}, and - * produces a collection of matched resources (both files and directories) as {@link Metadata}. - * Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple - * filepatterns, it will be produced multiple times. - * - * <p>By default, this transform matches each filepattern once and produces a bounded {@link - * PCollection}. To continuously watch each filepattern for new matches, use {@link - * Filepatterns#continuously(Duration, TerminationCondition)} - this will produce an unbounded - * {@link PCollection}. - * - * <p>By default, filepatterns matching no resources are treated according to {@link - * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link - * Filepatterns#withEmptyMatchTreatment}. - */ -public class Match { - private static final Logger LOG = LoggerFactory.getLogger(Match.class); - - /** See {@link Match}. */ - public static Filepatterns filepatterns() { - return new AutoValue_Match_Filepatterns.Builder() - .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) - .build(); - } - - /** Implementation of {@link #filepatterns}. */ - @AutoValue - public abstract static class Filepatterns - extends PTransform<PCollection<String>, PCollection<Metadata>> { - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - - @Nullable - abstract Duration getWatchInterval(); - - @Nullable - abstract TerminationCondition<String, ?> getWatchTerminationCondition(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); - - abstract Builder setWatchInterval(Duration watchInterval); - - abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition); - - abstract Filepatterns build(); - } - - /** - * Sets whether or not filepatterns matching no files are allowed. When using {@link - * #continuously}, they are always allowed, and this parameter is ignored. - */ - public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); - } - - /** - * Continuously watches for new resources matching the filepattern, repeatedly matching it at - * the given interval, until the given termination condition is reached. The returned {@link - * PCollection} is unbounded. - * - * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}. - * - * @see TerminationCondition - */ - @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) - public Filepatterns continuously( - Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { - return toBuilder() - .setWatchInterval(pollInterval) - .setWatchTerminationCondition(terminationCondition) - .build(); - } - - @Override - public PCollection<Metadata> expand(PCollection<String> input) { - if (getWatchInterval() == null) { - return input.apply("Match filepatterns", ParDo.of(new MatchFn(getEmptyMatchTreatment()))); - } else { - return input - .apply( - "Continuously match filepatterns", - Watch.growthOf(new MatchPollFn()) - .withPollInterval(getWatchInterval()) - .withTerminationPerInput(getWatchTerminationCondition())) - .apply(Values.<Metadata>create()); - } - } - - private static class MatchFn extends DoFn<String, Metadata> { - private final EmptyMatchTreatment emptyMatchTreatment; - - public MatchFn(EmptyMatchTreatment emptyMatchTreatment) { - this.emptyMatchTreatment = emptyMatchTreatment; - } - - @ProcessElement - public void process(ProcessContext c) throws Exception { - String filepattern = c.element(); - MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment); - LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern); - for (Metadata metadata : match.metadata()) { - c.output(metadata); - } - } - } - - private static class MatchPollFn implements Watch.Growth.PollFn<String, Metadata> { - @Override - public PollResult<Metadata> apply(String input, Instant timestamp) throws Exception { - return PollResult.incomplete( - Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java index 990f508..0cd7105 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection; * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}. * * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link - * Match#filepatterns()}. + * FileIO#match} or {@link FileIO#matchAll}. */ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCollection<T>> { private final SerializableFunction<String, Boolean> isSplittable; http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index eba5ab5..57bfaa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; +import org.apache.beam.sdk.io.FileIO.MatchConfiguration; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; @@ -193,7 +193,7 @@ public class TextIO { return new AutoValue_TextIO_Read.Builder() .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) - .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .build(); } @@ -214,7 +214,7 @@ public class TextIO { // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(64 * 1024 * 1024L) - .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .build(); } @@ -259,32 +259,19 @@ public class TextIO { /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<String>> { - @Nullable - abstract ValueProvider<String> getFilepattern(); - abstract Compression getCompression(); - - @Nullable - abstract Duration getWatchForNewFilesInterval(); - - @Nullable - abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition(); - + @Nullable abstract ValueProvider<String> getFilepattern(); + abstract MatchConfiguration getMatchConfiguration(); abstract boolean getHintMatchesManyFiles(); - abstract EmptyMatchTreatment getEmptyMatchTreatment(); - @Nullable - abstract byte[] getDelimiter(); - + abstract Compression getCompression(); + @Nullable abstract byte[] getDelimiter(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); - abstract Builder setCompression(Compression compression); - abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder setWatchForNewFilesTerminationCondition( - TerminationCondition<?, ?> condition); + abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder setHintMatchesManyFiles(boolean hintManyFiles); - abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); + abstract Builder setCompression(Compression compression); abstract Builder setDelimiter(byte[] delimiter); abstract Read build(); @@ -314,6 +301,11 @@ public class TextIO { return toBuilder().setFilepattern(filepattern).build(); } + /** Sets the {@link MatchConfiguration}. */ + public Read withMatchConfiguration(MatchConfiguration matchConfiguration) { + return toBuilder().setMatchConfiguration(matchConfiguration).build(); + } + /** @deprecated Use {@link #withCompression}. */ @Deprecated public Read withCompressionType(TextIO.CompressionType compressionType) { @@ -330,21 +322,15 @@ public class TextIO { } /** - * Continuously watches for new files matching the filepattern, polling it at the given - * interval, until the given termination condition is reached. The returned {@link PCollection} - * is unbounded. + * See {@link MatchConfiguration#continuously}. * * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}. - * - * @see TerminationCondition */ @Experimental(Kind.SPLITTABLE_DO_FN) public Read watchForNewFiles( - Duration pollInterval, TerminationCondition<?, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } /** @@ -360,12 +346,9 @@ public class TextIO { return toBuilder().setHintMatchesManyFiles(true).build(); } - /** - * Configures whether or not a filepattern matching no files is allowed. When using {@link - * #watchForNewFiles}, it is always allowed and this parameter is ignored. - */ + /** See {@link MatchConfiguration#withEmptyMatchTreatment}. */ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** @@ -390,29 +373,27 @@ public class TextIO { @Override public PCollection<String> expand(PBegin input) { checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform"); - if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) { + if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) { return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource())); } // All other cases go through ReadAll. - ReadAll readAll = - readAll() - .withCompression(getCompression()) - .withEmptyMatchTreatment(getEmptyMatchTreatment()) - .withDelimiter(getDelimiter()); - if (getWatchForNewFilesInterval() != null) { - TerminationCondition<String, ?> readAllCondition = - ignoreInput(getWatchForNewFilesTerminationCondition()); - readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition); - } return input .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) - .apply("Via ReadAll", readAll); + .apply( + "Via ReadAll", + readAll() + .withCompression(getCompression()) + .withMatchConfiguration(getMatchConfiguration()) + .withDelimiter(getDelimiter())); } // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - return CompressedSource - .from(new TextSource(getFilepattern(), getEmptyMatchTreatment(), getDelimiter())) + return CompressedSource.from( + new TextSource( + getFilepattern(), + getMatchConfiguration().getEmptyMatchTreatment(), + getDelimiter())) .withCompression(getCompression()); } @@ -425,16 +406,10 @@ public class TextIO { .withLabel("Compression Type")) .addIfNotNull( DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")) - .add( - DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) - .withLabel("Treatment of filepatterns that match no files")) - .addIfNotNull( - DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) - .withLabel("Interval to watch for new files")) + .include("matchConfiguration", getMatchConfiguration()) .addIfNotNull( DisplayData.item("delimiter", Arrays.toString(getDelimiter())) .withLabel("Custom delimiter to split records")); - } } @@ -444,15 +419,8 @@ public class TextIO { @AutoValue public abstract static class ReadAll extends PTransform<PCollection<String>, PCollection<String>> { + abstract MatchConfiguration getMatchConfiguration(); abstract Compression getCompression(); - - @Nullable - abstract Duration getWatchForNewFilesInterval(); - - @Nullable - abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition(); - - abstract EmptyMatchTreatment getEmptyMatchTreatment(); abstract long getDesiredBundleSizeBytes(); @Nullable abstract byte[] getDelimiter(); @@ -461,16 +429,18 @@ public class TextIO { @AutoValue.Builder abstract static class Builder { + abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder setCompression(Compression compression); - abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder setWatchForNewFilesTerminationCondition( - TerminationCondition<String, ?> condition); - abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); abstract Builder setDelimiter(byte[] delimiter); abstract ReadAll build(); } + /** Sets the {@link MatchConfiguration}. */ + public ReadAll withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + /** @deprecated Use {@link #withCompression}. */ @Deprecated public ReadAll withCompressionType(TextIO.CompressionType compressionType) { @@ -488,17 +458,15 @@ public class TextIO { /** Same as {@link Read#withEmptyMatchTreatment}. */ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { - return toBuilder().setEmptyMatchTreatment(treatment).build(); + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */ @Experimental(Kind.SPLITTABLE_DO_FN) public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { - return toBuilder() - .setWatchForNewFilesInterval(pollInterval) - .setWatchForNewFilesTerminationCondition(terminationCondition) - .build(); + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); } @VisibleForTesting @@ -512,22 +480,15 @@ public class TextIO { @Override public PCollection<String> expand(PCollection<String> input) { - Match.Filepatterns matchFilepatterns = - Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment()); - if (getWatchForNewFilesInterval() != null) { - matchFilepatterns = - matchFilepatterns.continuously( - getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); - } return input - .apply(matchFilepatterns) + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( new IsSplittableFn(getCompression()), getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getCompression(), getEmptyMatchTreatment(), - getDelimiter()))).setCoder(StringUtf8Coder.of()); + new CreateTextSourceFn(getCompression(), getDelimiter()))) + .setCoder(StringUtf8Coder.of()); } @Override @@ -536,30 +497,30 @@ public class TextIO { builder .add( - DisplayData.item("compressionType", getCompression().toString()) - .withLabel("Compression Type")) + DisplayData.item("compressionType", getCompression().toString()) + .withLabel("Compression Type")) .addIfNotNull( - DisplayData.item("delimiter", Arrays.toString(getDelimiter())) - .withLabel("Custom delimiter to split records")); + DisplayData.item("delimiter", Arrays.toString(getDelimiter())) + .withLabel("Custom delimiter to split records")) + .include("matchConfiguration", getMatchConfiguration()); } private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { private final Compression compression; - private final EmptyMatchTreatment emptyMatchTreatment; private byte[] delimiter; private CreateTextSourceFn( - Compression compression, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { + Compression compression, byte[] delimiter) { this.compression = compression; - this.emptyMatchTreatment = emptyMatchTreatment; this.delimiter = delimiter; } @Override public FileBasedSource<String> apply(String input) { return CompressedSource.from( - new TextSource(StaticValueProvider.of(input), emptyMatchTreatment, delimiter)) + new TextSource( + StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter)) .withCompression(compression); } } http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 8c68f57..695e196 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -335,7 +335,7 @@ public class AvroIOTest { .from(tmpFolder.getRoot().getAbsolutePath() + "/first*") .watchForNewFiles( Duration.millis(100), - Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3))))) + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))))) .containsInAnyOrder(firstValues); PAssert.that( readPipeline.apply( @@ -344,7 +344,7 @@ public class AvroIOTest { .from(tmpFolder.getRoot().getAbsolutePath() + "/first*") .watchForNewFiles( Duration.millis(100), - Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3))))) + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))))) .containsInAnyOrder(firstValues); PCollection<String> paths = http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index e55a820..e708b46 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -26,7 +26,6 @@ import static org.apache.beam.sdk.io.Compression.DEFLATE; import static org.apache.beam.sdk.io.Compression.GZIP; import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED; import static org.apache.beam.sdk.io.Compression.ZIP; -import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -78,6 +77,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; @@ -951,7 +951,8 @@ public class TextIOReadTest { // Make sure that compression type propagates into readAll() .withCompression(ZIP) .watchForNewFiles( - Duration.millis(100), afterTimeSinceNewOutput(Duration.standardSeconds(3)))); + Duration.millis(100), + Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); Thread writer = new Thread() {