[ https://issues.apache.org/jira/browse/BEAM-4313?focusedWorklogId=111304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111304 ]
ASF GitHub Bot logged work on BEAM-4313: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Jun/18 22:07 Start Date: 12/Jun/18 22:07 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #5540: [BEAM-4313] Fix and enforce FindBugs and ErrorProne violations in Dataflow Runner URL: https://github.com/apache/beam/pull/5540 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java index 295e7c86fe2..ff258030635 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java @@ -218,7 +218,7 @@ private MetricsContainerStepMapMetricResults( } @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { + public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { return new QueryResults(filter); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index 85d9acc7cc0..42d72ebcbac 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.construction.metrics.MetricFiltering; import org.apache.beam.runners.core.construction.metrics.MetricKey; @@ -257,7 +258,7 @@ public static MetricQueryResults create( } @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { + public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) { maybeExtractResult(filter, counterResults, counter); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java index f8f7cbd95bf..cb217e26e39 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectMetrics.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.construction.metrics.MetricFiltering; import org.apache.beam.runners.core.construction.metrics.MetricKey; @@ -258,7 +259,7 @@ public static MetricQueryResults create( } @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { + public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) { maybeExtractResult(filter, counterResults, counter); diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c5deb9c0a90..9195ccec42c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -19,7 +19,7 @@ import groovy.json.JsonOutput apply from: project(":").file("build_rules.gradle") -applyJavaNature(enableFindbugs: false /* BEAM-925 */) +applyJavaNature(failOnWarning: true) description = "Apache Beam :: Runners :: Google Cloud Dataflow" diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 16087bdb56c..145a6a23e1b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -25,7 +25,6 @@ import com.google.common.collect.ForwardingMap; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -192,7 +191,7 @@ public void processElement(ProcessContext c) throws Exception { } } - private final DataflowRunner runner; + private final transient DataflowRunner runner; private final PCollectionView<Map<K, V>> view; /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() @@ -676,7 +675,7 @@ public void processElement(ProcessContext c) throws Exception { } } - private final DataflowRunner runner; + private final transient DataflowRunner runner; private final PCollectionView<Map<K, Iterable<V>>> view; /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() @@ -724,11 +723,6 @@ public BatchViewAsMultimap( IterableCoder.of( FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)))); - TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>> defaultValue = - new TransformedMap<>( - IterableWithWindowedValuesToIterable.of(), - ImmutableMap.<K, Iterable<WindowedValue<V>>>of()); - return BatchViewAsSingleton.applyForSingleton( runner, input, new ToMultimapDoFn<>(windowCoder), finalValueCoder, view); } @@ -900,7 +894,7 @@ public void processElement(ProcessContext c) throws Exception { } } - private final DataflowRunner runner; + private final transient DataflowRunner runner; private final PCollectionView<T> view; private final CombineFn<T, ?, T> combineFn; private final int fanout; @@ -1054,7 +1048,7 @@ public void processElement(ProcessContext c) throws Exception { } } - private final DataflowRunner runner; + private final transient DataflowRunner runner; private final PCollectionView<List<T>> view; /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() @@ -1134,7 +1128,7 @@ protected String getKindString() { */ static class BatchViewAsIterable<T> extends PTransform<PCollection<T>, PCollection<?>> { - private final DataflowRunner runner; + private final transient DataflowRunner runner; private final PCollectionView<Iterable<T>> view; /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() @@ -1377,6 +1371,11 @@ public String toString() { return MoreObjects.toStringHelper(getClass()).add("value", getValue()).toString(); } + @Override + public int hashCode() { + return Objects.hash(getValue()); + } + @Override public boolean equals(Object o) { if (o instanceof ValueInEmptyWindows) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index bea9a75deb1..42c8aa59adb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -89,7 +89,7 @@ private MetricQueryResults populateMetricQueryResults( } @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { + public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { List<MetricUpdate> metricUpdates; ImmutableList<MetricResult<Long>> counters = ImmutableList.of(); ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of(); @@ -345,12 +345,19 @@ public static MetricQueryResults create( abstract static class DataflowMetricResult<T> implements MetricResult<T> { // need to define these here so they appear in the correct order // and the generated constructor is usable and consistent + @Override public abstract MetricName getName(); + + @Override public abstract String getStep(); + @Nullable protected abstract T committedInternal(); + + @Override public abstract T getAttempted(); + @Override public T getCommitted() { T committed = committedInternal(); if (committed == null) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index fd6c6411788..2ce503af7a2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -48,7 +48,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -380,7 +379,7 @@ public Job translate(List<DataflowPackage> packages) { settings.setMaxNumWorkers(options.getMaxNumWorkers()); workerPool.setAutoscalingSettings(settings); - List<WorkerPool> workerPools = new LinkedList<>(); + List<WorkerPool> workerPools = new ArrayList<>(); workerPools.add(workerPool); environment.setWorkerPools(workerPools); @@ -488,7 +487,7 @@ public StepTranslator addStep(PTransform<?, ?> transform, String type) { // Start the next "steps" list item. List<Step> steps = job.getSteps(); if (steps == null) { - steps = new LinkedList<>(); + steps = new ArrayList<>(); job.setSteps(steps); } @@ -504,6 +503,7 @@ public StepTranslator addStep(PTransform<?, ?> transform, String type) { return stepContext; } + @Override public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) { String stepName = stepNames.get(producer); checkArgument(stepName != null, "%s doesn't have a name specified", producer); @@ -794,7 +794,7 @@ public void translate( Flatten.PCollections<T> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "Flatten"); - List<OutputReference> inputs = new LinkedList<>(); + List<OutputReference> inputs = new ArrayList<>(); for (PValue input : context.getInputs(transform).values()) { inputs.add( context.asOutputReference( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b800eb69a19..6932627409e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; @@ -43,7 +44,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.channels.Channels; import java.util.ArrayList; @@ -749,7 +752,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); try (PrintWriter printWriter = new PrintWriter( - Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)))) { + new BufferedWriter(new OutputStreamWriter(Channels.newOutputStream( + FileSystems.create(fileResource, MimeTypes.TEXT)), UTF_8)))) { printWriter.print(workSpecJson); LOG.info("Printed job specification to {}", fileLocation); } catch (IOException ex) { @@ -1211,7 +1215,7 @@ public void translate( */ private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> { private final Create.Values<T> transform; - private final PCollection<T> originalOutput; + private final transient PCollection<T> originalOutput; private StreamingFnApiCreate( Create.Values<T> transform, @@ -1604,7 +1608,7 @@ private String getJobIdFromName(String jobName) { } } - private class StreamingPubsubIOWriteOverrideFactory + private static class StreamingPubsubIOWriteOverrideFactory implements PTransformOverrideFactory< PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> { private final DataflowRunner runner; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java index 69e4f469343..935ccec102e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerInfo.java @@ -112,6 +112,7 @@ public String getContainerVersion() { return properties.get(CONTAINER_VERSION_KEY); } + @Override public Map<String, String> getProperties() { return ImmutableMap.copyOf((Map) properties); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index 34bc38c0f94..1168a63e369 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -142,6 +142,7 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { /** * Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner. */ + @SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish private boolean waitForStreamingJobTermination( final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { // In streaming, there are infinite retries, so rather than timeout @@ -318,14 +319,14 @@ public String toString() { private static class ErrorMonitorMessagesHandler implements JobMessagesHandler { private final DataflowPipelineJob job; private final JobMessagesHandler messageHandler; - private final StringBuffer errorMessage; + private final StringBuilder errorMessage; private volatile boolean hasSeenError; private ErrorMonitorMessagesHandler( DataflowPipelineJob job, JobMessagesHandler messageHandler) { this.job = job; this.messageHandler = messageHandler; - this.errorMessage = new StringBuffer(); + this.errorMessage = new StringBuilder(); this.hasSeenError = false; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index cefdf5acd43..3a259a881c1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -113,6 +113,8 @@ public abstract static class IsmRecord<V> { abstract List<?> keyComponents(); @Nullable abstract V value(); + + @SuppressWarnings("mutable") @Nullable abstract byte[] metadata(); IsmRecord() {} // Prevent public constructor @@ -279,7 +281,7 @@ public int getNumberOfShardKeyCoders(List<?> keyComponents) { * using {@code 1225801234} as the seed value. We ensure that shard ids for * metadata keys and normal keys do not overlap. */ - public <V, T> int hash(List<?> keyComponents) { + public int hash(List<?> keyComponents) { return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<>()); } @@ -289,7 +291,7 @@ public int getNumberOfShardKeyCoders(List<?> keyComponents) { * <p>Mutates {@code keyBytes} such that when returned, contains the encoded * version of the key components. */ - public <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) { + public int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) { return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<>()); } @@ -301,7 +303,7 @@ public int getNumberOfShardKeyCoders(List<?> keyComponents) { * store the location where each key component's encoded byte representation ends within * {@code keyBytes}. */ - public <V, T> int encodeAndHash( + public int encodeAndHash( List<?> keyComponents, RandomAccessData keyBytesToMutate, List<Integer> keyComponentByteOffsetsToMutate) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java index ce23a1bd8b0..813fc3a9938 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudKnownType.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.util; +import com.google.common.collect.ImmutableList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -82,11 +83,11 @@ }; private final String uri; - private final Class<?>[] classes; + private final ImmutableList<Class<?>> classes; CloudKnownType(String uri, Class<?>... classes) { this.uri = uri; - this.classes = classes; + this.classes = ImmutableList.copyOf(classes); } public String getUri() { @@ -96,7 +97,7 @@ public String getUri() { public abstract <T> T parse(Object value, Class<T> clazz); public Class<?> defaultClass() { - return classes[0]; + return classes.get(0); } private static final Map<String, CloudKnownType> typesByUri = diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java index b3680e94561..171dcd03d39 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObject.java @@ -36,7 +36,7 @@ * additional properties to be presented during deserialization, representing * child objects by building additional {@code CloudObject}s. */ -public final class CloudObject extends GenericJson { +public final class CloudObject extends GenericJson implements Cloneable { /** * Constructs a {@code CloudObject} by copying the supplied serialized object * spec, which must represent an SDK object serialized for transport via the diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index f04f7e0cbbc..e61f64eddcc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -26,6 +26,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -125,7 +126,7 @@ public MonitoringUtil(DataflowClient dataflowClient) { /** * Comparator for sorting rows in increasing order based on timestamp. */ - public static class TimeStampComparator implements Comparator<JobMessage> { + public static class TimeStampComparator implements Comparator<JobMessage>, Serializable { @Override public int compare(JobMessage o1, JobMessage o2) { @Nullable Instant t1 = fromCloudTime(o1.getTime()); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 387b7e3a590..0374ac4304f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -38,6 +38,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; +import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; @@ -118,7 +119,7 @@ public void close() { /** Utility comparator used in uploading packages efficiently. */ - private static class PackageUploadOrder implements Comparator<PackageAttributes> { + private static class PackageUploadOrder implements Comparator<PackageAttributes>, Serializable { @Override public int compare(PackageAttributes o1, PackageAttributes o2) { // Smaller size compares high so that bigger packages are uploaded first. @@ -224,7 +225,8 @@ private StagingResult tryStagePackageWithRetry( "Upload failed, will NOT retry staging of package: {}", sourceDescription, ioException); - throw new RuntimeException("Could not stage %s to %s", ioException); + throw new RuntimeException( + String.format("Could not stage %s to %s", sourceDescription, target), ioException); } else { LOG.warn( "Upload attempt failed, sleeping before retrying staging of package: {}", @@ -507,6 +509,7 @@ public PackageAttributes withPackageName(String overridePackageName) { public abstract File getSource(); /** @return the bytes to be uploaded, if any */ + @SuppressWarnings("mutable") @Nullable public abstract byte[] getBytes(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java index 5259828eacd..94d54069175 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java @@ -27,8 +27,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; import java.util.Arrays; import java.util.Comparator; +import java.util.Objects; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -71,7 +73,7 @@ public void encode(RandomAccessData value, OutputStream outStream) @Override public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { - if (value == POSITIVE_INFINITY) { + if (Objects.equals(value, POSITIVE_INFINITY)) { throw new CoderException("Positive infinity can not be encoded."); } if (!context.isWholeStream) { @@ -134,7 +136,7 @@ protected long getEncodedElementByteSize(RandomAccessData value) * all other {@link RandomAccessData}. */ public static final class UnsignedLexicographicalComparator - implements Comparator<RandomAccessData> { + implements Comparator<RandomAccessData>, Serializable { // Do not instantiate private UnsignedLexicographicalComparator() { } @@ -147,6 +149,7 @@ public int compare(RandomAccessData o1, RandomAccessData o2) { /** * Compare the two sets of bytes starting at the given offset. */ + @SuppressWarnings("ReferenceEquality") // equals overload calls into this compare method public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) { if (o1 == o2) { return 0; @@ -321,6 +324,7 @@ public boolean equals(Object other) { if (!(other instanceof RandomAccessData)) { return false; } + return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0; } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index a385eec6b33..f8ff9933ec0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -29,7 +29,7 @@ import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.Serializable; -import java.util.LinkedList; +import java.util.ArrayList; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides.StatefulMultiOutputParDo; import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides.StatefulSingleOutputParDo; @@ -195,7 +195,7 @@ private static DataflowPipelineOptions buildPipelineOptions(String ... args) thr options.setProject("some-project"); options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); - options.setFilesToStage(new LinkedList<>()); + options.setFilesToStage(new ArrayList<>()); options.setGcsUtil(mockGcsUtil); return options; } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 58ac2734cdd..b4e07e50e02 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -49,7 +49,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -184,7 +183,7 @@ private static DataflowPipelineOptions buildPipelineOptions() throws IOException options.setProject("some-project"); options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); - options.setFilesToStage(new LinkedList<>()); + options.setFilesToStage(new ArrayList<>()); options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest())); options.setGcsUtil(mockGcsUtil); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 396c4456f2d..ab5cc0b199c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -71,7 +71,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -301,7 +300,7 @@ private DataflowPipelineOptions buildPipelineOptions() throws IOException { options.setTempLocation(VALID_TEMP_BUCKET); options.setRegion(REGION_ID); // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. - options.setFilesToStage(new LinkedList<>()); + options.setFilesToStage(new ArrayList<>()); options.setDataflowClient(buildMockDataflow()); options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java index 4e6914d619c..bf7354abde9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java @@ -55,7 +55,8 @@ public transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); - private PrimitiveParDoSingleFactory<Integer, Long> factory = new PrimitiveParDoSingleFactory<>(); + private transient PrimitiveParDoSingleFactory<Integer, Long> factory = + new PrimitiveParDoSingleFactory<>(); /** * A test that demonstrates that the replacement transform has the Display Data of the @@ -150,10 +151,12 @@ public void toLong(ProcessContext ctxt) { ctxt.output(ctxt.element().longValue()); } + @Override public boolean equals(Object other) { return other != null && other.getClass().equals(getClass()); } + @Override public int hashCode() { return getClass().hashCode(); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java index cf54556a093..3f082e02fae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -615,7 +615,7 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements SerializableMatcher<PipelineResult> { - private final DataflowPipelineJob mockJob; + private final transient DataflowPipelineJob mockJob; private final int called; public TestSuccessMatcher(DataflowPipelineJob job, int times) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index cea44f083dc..df474b2d2b5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import com.google.common.base.Splitter; +import java.util.List; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -54,13 +56,13 @@ public void testUserNameIsNotSet() { System.getProperties().remove("user.name"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setAppName("TestApplication"); - String[] nameComponents = options.getJobName().split("-"); - assertEquals(4, nameComponents.length); - assertEquals("testapplication", nameComponents[0]); - assertEquals("", nameComponents[1]); - assertEquals("1208190706", nameComponents[2]); + List<String> nameComponents = Splitter.on('-').splitToList(options.getJobName()); + assertEquals(4, nameComponents.size()); + assertEquals("testapplication", nameComponents.get(0)); + assertEquals("", nameComponents.get(1)); + assertEquals("1208190706", nameComponents.get(2)); // Verify the last component is a hex integer (unsigned). - Long.parseLong(nameComponents[3], 16); + Long.parseLong(nameComponents.get(3), 16); assertTrue(options.getJobName().length() <= 40); } @@ -70,13 +72,13 @@ public void testAppNameAndUserNameAreLong() { System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setAppName("1234567890123456789012345678901234567890"); - String[] nameComponents = options.getJobName().split("-"); - assertEquals(4, nameComponents.length); - assertEquals("a234567890123456789012345678901234567890", nameComponents[0]); - assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]); - assertEquals("1208190706", nameComponents[2]); + List<String> nameComponents = Splitter.on('-').splitToList(options.getJobName()); + assertEquals(4, nameComponents.size()); + assertEquals("a234567890123456789012345678901234567890", nameComponents.get(0)); + assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents.get(1)); + assertEquals("1208190706", nameComponents.get(2)); // Verify the last component is a hex integer (unsigned). - Long.parseLong(nameComponents[3], 16); + Long.parseLong(nameComponents.get(3), 16); } @Test @@ -85,13 +87,13 @@ public void testAppNameIsLong() { System.getProperties().put("user.name", "abcde"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setAppName("1234567890123456789012345678901234567890"); - String[] nameComponents = options.getJobName().split("-"); - assertEquals(4, nameComponents.length); - assertEquals("a234567890123456789012345678901234567890", nameComponents[0]); - assertEquals("abcde", nameComponents[1]); - assertEquals("1208190706", nameComponents[2]); + List<String> nameComponents = Splitter.on('-').splitToList(options.getJobName()); + assertEquals(4, nameComponents.size()); + assertEquals("a234567890123456789012345678901234567890", nameComponents.get(0)); + assertEquals("abcde", nameComponents.get(1)); + assertEquals("1208190706", nameComponents.get(2)); // Verify the last component is a hex integer (unsigned). - Long.parseLong(nameComponents[3], 16); + Long.parseLong(nameComponents.get(3), 16); } @Test @@ -100,13 +102,13 @@ public void testUserNameIsLong() { System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setAppName("1234567890"); - String[] nameComponents = options.getJobName().split("-"); - assertEquals(4, nameComponents.length); - assertEquals("a234567890", nameComponents[0]); - assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents[1]); - assertEquals("1208190706", nameComponents[2]); + List<String> nameComponents = Splitter.on('-').splitToList(options.getJobName()); + assertEquals(4, nameComponents.size()); + assertEquals("a234567890", nameComponents.get(0)); + assertEquals("abcdeabcdeabcdeabcdeabcdeabcde", nameComponents.get(1)); + assertEquals("1208190706", nameComponents.get(2)); // Verify the last component is a hex integer (unsigned). - Long.parseLong(nameComponents[3], 16); + Long.parseLong(nameComponents.get(3), 16); } @Test @@ -115,13 +117,13 @@ public void testUtf8UserNameAndApplicationNameIsNormalized() { System.getProperties().put("user.name", "ði ıntəˈnæʃənəl "); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn"); - String[] nameComponents = options.getJobName().split("-"); - assertEquals(4, nameComponents.length); - assertEquals("f00n0t0k00so0si0e00n", nameComponents[0]); - assertEquals("0i00nt00n000n0l0", nameComponents[1]); - assertEquals("1208190706", nameComponents[2]); + List<String> nameComponents = Splitter.on('-').splitToList(options.getJobName()); + assertEquals(4, nameComponents.size()); + assertEquals("f00n0t0k00so0si0e00n", nameComponents.get(0)); + assertEquals("0i00nt00n000n0l0", nameComponents.get(1)); + assertEquals("1208190706", nameComponents.get(2)); // Verify the last component is a hex integer (unsigned). - Long.parseLong(nameComponents[3], 16); + Long.parseLong(nameComponents.get(3), 16); } @Test diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 68c45d6d769..95bf003f156 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -61,7 +61,7 @@ public void testGetJobMessages() throws IOException { ListJobMessagesResponse firstResponse = new ListJobMessagesResponse(); firstResponse.setJobMessages(new ArrayList<>()); - for (int i = 0; i < 100; ++i) { + for (long i = 0; i < 100; ++i) { JobMessage message = new JobMessage(); message.setId("message_" + i); message.setTime(TimeUtil.toCloudTime(new Instant(i))); @@ -72,7 +72,7 @@ public void testGetJobMessages() throws IOException { ListJobMessagesResponse secondResponse = new ListJobMessagesResponse(); secondResponse.setJobMessages(new ArrayList<>()); - for (int i = 100; i < 150; ++i) { + for (long i = 100; i < 150; ++i) { JobMessage message = new JobMessage(); message.setId("message_" + i); message.setTime(TimeUtil.toCloudTime(new Instant(i))); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 7a6e2883aa7..4880b83cde0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; @@ -129,7 +130,7 @@ public void teardown() { private File makeFileWithContents(String name, String contents) throws Exception { File tmpFile = tmpFolder.newFile(name); Files.write(contents, tmpFile, StandardCharsets.UTF_8); - tmpFile.setLastModified(0); // required for determinism with directories + assertTrue(tmpFile.setLastModified(0)); // required for determinism with directories return tmpFile; } @@ -306,11 +307,12 @@ public void testPackageUploadWithDirectorySucceeds() throws Exception { verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); - ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source())); List<String> zipEntryNames = new ArrayList<>(); - for (ZipEntry entry = inputStream.getNextEntry(); entry != null; - entry = inputStream.getNextEntry()) { - zipEntryNames.add(entry.getName()); + try (ZipInputStream inputStream = new ZipInputStream(Channels.newInputStream(pipe.source()))) { + for (ZipEntry entry = inputStream.getNextEntry(); entry != null; + entry = inputStream.getNextEntry()) { + zipEntryNames.add(entry.getName()); + } } assertThat(zipEntryNames, @@ -338,7 +340,10 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); - assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry()); + try (ZipInputStream zipInputStream = + new ZipInputStream(Channels.newInputStream(pipe.source()))) { + assertNull(zipInputStream.getNextEntry()); + } } @Test(expected = RuntimeException.class) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java index 8efbd7f6b36..82827361a8f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/StructsTest.java @@ -74,7 +74,7 @@ addStringList(o, "multipleStringsKey", Arrays.asList("hi", "there", "bob")); addLongs(o, "multipleLongsKey", 47L, 1L << 42, -5L); addLong(o, "singletonLongKey", 42L); - addDouble(o, "singletonDoubleKey", 3.14); + addDouble(o, "singletonDoubleKey", Math.PI); addBoolean(o, "singletonBooleanKey", true); addNull(o, "noObjectsKey"); addList(o, "multipleObjectsKey", makeCloudObjects()); diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 4b15554e5a4..1b0af142e4f 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -139,6 +139,33 @@ <!-- Takes ownership of input buffer --> </Match> + <Match> + <Class name="org.apache.beam.runners.dataflow.util.MonitoringUtilTest" /> + <Field name="thrown" /> + <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" /> + <!-- TestRule used automatically by JUnit framework --> + </Match> + <Match> + <Class name="org.apache.beam.runners.dataflow.util.OutputReference" /> + <Field name="type" /> + <Bug pattern="SS_SHOULD_BE_STATIC" /> + <!-- Field read via reflection --> + </Match> + + <Match> + <Class name="org.apache.beam.runners.dataflow.options.DataflowPipelineOptionsTest" /> + <Field name="restoreSystemProperties" /> + <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" /> + <!-- TestRule used automatically by JUnit framework --> + </Match> + + <Match> + <Class name="org.apache.beam.runners.dataflow.TestDataflowRunner" /> + <Method name="waitForStreamingJobTermination" /> + <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" /> + <!-- waitForStreamingJobTermination checks status via job.waitUntilFinish() --> + </Match> + <Match> <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver" /> <Or> diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java index a745a6bef6e..7a710b98f2f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.metrics; +import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -57,5 +58,5 @@ * // applications. * }</pre> */ - public abstract MetricQueryResults queryMetrics(MetricsFilter filter); + public abstract MetricQueryResults queryMetrics(@Nullable MetricsFilter filter); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 111304) Time Spent: 3h 40m (was: 3.5h) > Enforce ErrorProne analysis in Dataflow runner project > ------------------------------------------------------ > > Key: BEAM-4313 > URL: https://issues.apache.org/jira/browse/BEAM-4313 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow > Reporter: Scott Wegner > Assignee: Scott Wegner > Priority: Minor > Labels: errorprone, starter > Time Spent: 3h 40m > Remaining Estimate: 0h > > Java ErrorProne static analysis was [recently > enabled|https://github.com/apache/beam/pull/5161] in the Gradle build > process, but only as warnings. ErrorProne errors are generally useful and > easy to fix. Some work was done to [make sdks-java-core > ErrorProne-clean|https://github.com/apache/beam/pull/5319] and add > enforcement. This task is clean ErrorProne warnings and add enforcement in > {{beam-runners-google-cloud-dataflow-java}}. Additional context discussed on > the [dev > list|https://lists.apache.org/thread.html/95aae2785c3cd728c2d3378cbdff2a7ba19caffcd4faa2049d2e2f46@%3Cdev.beam.apache.org%3E]. > Fixing this issue will involve: > # Follow instructions in the [Contribution > Guide|https://beam.apache.org/contribute/] to set up a {{beam}} development > environment. > # Run the following command to compile and run ErrorProne analysis on the > project: {{./gradlew :beam-runners-google-cloud-dataflow-java:assemble}} > # Fix each ErrorProne warning from the {{runners/google-cloud-dataflow-java}} > project. > # In {{runners/google-cloud-dataflow-java/build.gradle}}, add > {{failOnWarning: true}} to the call the {{applyJavaNature()}} > ([example|https://github.com/apache/beam/pull/5319/files#diff-9390c20635aed5f42f83b97506a87333R20]). > This starter issue is sponsored by [~swegner]. Feel free to [reach > out|https://beam.apache.org/community/contact-us/] with questions or code > review: > * JIRA: [~swegner] > * GitHub: [@swegner|https://github.com/swegner] > * Slack: [@Scott Wegner|https://s.apache.org/beam-slack-channel] > * Email: swegner at google dot com -- This message was sent by Atlassian JIRA (v7.6.3#76005)