This is an automated email from the ASF dual-hosted git repository. bhulette pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 06a5e67 [BEAM-11936] Fix errorprone warnings (#15890) 06a5e67 is described below commit 06a5e67332aae53ea90dedb4ef6421c2a7d65035 Author: Benjamin Gonzalez <74670721+benw...@users.noreply.github.com> AuthorDate: Wed Dec 8 15:26:11 2021 -0600 [BEAM-11936] Fix errorprone warnings (#15890) * [BEAM-11936] Fix errorprone warning: FloatingPointAssertionWithinEpsilon * [BEAM-11936] Fix errorprone warning: LockNotBeforeTry * [BEAM-11936] Fix errorprone warning: PreferJavaTimeOverload * [BEAM-11936] Fix errorprone warning: ModifiedButNotUsed * [BEAM-11936] Fix errorprone warning: UnusedNestedClass * [BEAM-11936] Remove suppresswarnings * [BEAM-11936] Fix suppressed warnings * [BEAM-11936] Fix errorprone warnings after merge master * [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload * [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload * [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload * [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload * [BEAM-11936] Add suppresswarning * [BEAM-11936] Remove unused inner class --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 - .../apache/beam/examples/snippets/Snippets.java | 8 ++ .../core/construction/PTransformMatchersTest.java | 21 ---- .../beam/runners/direct/NanosOffsetClock.java | 6 +- .../runners/direct/TransformEvaluatorRegistry.java | 16 --- .../runners/direct/UnboundedReadDeduplicator.java | 3 +- .../runners/direct/CloningBundleFactoryTest.java | 67 ----------- .../flink/FlinkStreamingPipelineTranslator.java | 4 +- .../flink/FlinkStreamingTransformTranslators.java | 17 --- .../wrappers/streaming/DoFnOperator.java | 2 +- .../state/FlinkBroadcastStateInternals.java | 131 --------------------- .../streaming/ExecutableStageDoFnOperatorTest.java | 1 + .../beam/runners/dataflow/DataflowRunner.java | 110 ----------------- .../runners/dataflow/DataflowPipelineJobTest.java | 25 ---- .../beam/runners/dataflow/worker/ReaderCache.java | 3 +- .../beam/runners/dataflow/worker/StateFetcher.java | 4 +- .../dataflow/worker/StreamingDataflowWorker.java | 2 +- .../fn/data/RemoteGrpcPortWriteOperation.java | 6 +- .../common/worker/CachingShuffleBatchReader.java | 4 +- .../control/DefaultJobBundleFactory.java | 16 +-- .../fnexecution/control/RemoteExecutionTest.java | 2 - .../beam/runners/spark/io/MicrobatchSource.java | 2 +- .../translation/utils/SideInputStorage.java | 4 +- .../runners/spark/util/GlobalWatermarkHolder.java | 4 +- .../beam/runners/spark/util/SideInputStorage.java | 4 +- .../src/main/java/org/apache/beam/sdk/io/Read.java | 3 +- .../org/apache/beam/sdk/schemas/SchemaCoder.java | 20 ---- .../apache/beam/sdk/values/PCollectionViews.java | 59 ---------- .../apache/beam/sdk/coders/CoderRegistryTest.java | 4 - .../apache/beam/sdk/testing/ExpectedLogsTest.java | 10 +- .../beam/sdk/testing/SystemNanoTimeSleeper.java | 4 +- .../sdk/transforms/reflect/DoFnSignaturesTest.java | 9 -- .../GrowableOffsetRangeTrackerTest.java | 2 +- .../core/translate/TimestampExtractTransform.java | 8 -- .../sql/meta/provider/kafka/BeamKafkaTable.java | 3 +- .../org/apache/beam/sdk/fn/CancellableQueue.java | 4 +- .../java/org/apache/beam/fn/harness/FnHarness.java | 4 +- .../harness/state/StateFetchingIteratorsTest.java | 2 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 3 +- .../bigquery/StorageApiWritesShardedRecords.java | 3 +- .../internal/LimitingTopicBacklogReader.java | 6 +- .../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 24 ---- .../sdk/io/hadoop/format/TestRowDBWritable.java | 10 -- .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 3 +- .../beam/sdk/io/kafka/KafkaUnboundedReader.java | 4 +- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 5 +- .../org/apache/beam/sdk/io/xml/XmlSourceTest.java | 4 +- 47 files changed, 66 insertions(+), 596 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 001abca..7eafafc 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1140,18 +1140,12 @@ class BeamModulePlugin implements Plugin<Project> { options.errorprone.errorproneArgs.add("-Xep:EqualsGetClass:OFF") options.errorprone.errorproneArgs.add("-Xep:EqualsUnsafeCast:OFF") options.errorprone.errorproneArgs.add("-Xep:ExtendsAutoValue:OFF") - options.errorprone.errorproneArgs.add("-Xep:FloatingPointAssertionWithinEpsilon:OFF") options.errorprone.errorproneArgs.add("-Xep:JavaTimeDefaultTimeZone:OFF") - options.errorprone.errorproneArgs.add("-Xep:LockNotBeforeTry:OFF") options.errorprone.errorproneArgs.add("-Xep:MixedMutabilityReturnType:OFF") - options.errorprone.errorproneArgs.add("-Xep:PreferJavaTimeOverload:OFF") - options.errorprone.errorproneArgs.add("-Xep:ModifiedButNotUsed:OFF") options.errorprone.errorproneArgs.add("-Xep:ThreadPriorityCheck:OFF") options.errorprone.errorproneArgs.add("-Xep:TimeUnitConversionChecker:OFF") options.errorprone.errorproneArgs.add("-Xep:UndefinedEquals:OFF") options.errorprone.errorproneArgs.add("-Xep:UnnecessaryLambda:OFF") - options.errorprone.errorproneArgs.add("-Xep:UnusedVariable:OFF") - options.errorprone.errorproneArgs.add("-Xep:UnusedNestedClass:OFF") options.errorprone.errorproneArgs.add("-Xep:UnsafeReflectiveConstructionCast:OFF") } diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index d2e78fe..1bde081 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -191,6 +191,7 @@ public class Snippets { } { + @SuppressWarnings("ModifiedButNotUsed") // [START BigQueryDataTypes] TableRow row = new TableRow(); row.set("string", "abc"); @@ -1174,6 +1175,7 @@ public class Snippets { } } + @SuppressWarnings("unused") private static class BundleFinalization { private static class BundleFinalizationDoFn extends DoFn<String, Integer> { // [START BundleFinalize] @@ -1191,6 +1193,7 @@ public class Snippets { } } + @SuppressWarnings("unused") private static class SplittableDoFn { private static void seekToNextRecordBoundaryInFile( @@ -1230,6 +1233,7 @@ public class Snippets { } // [END SDF_BasicExample] + @SuppressWarnings("unused") private static class BasicExampleWithInitialSplitting extends FileToWordsFn { // [START SDF_BasicExampleWithSplitting] void splitRestriction( @@ -1248,6 +1252,7 @@ public class Snippets { // [END SDF_BasicExampleWithSplitting] } + @SuppressWarnings("unused") private static class BasicExampleWithBadTryClaimLoop extends DoFn<String, Integer> { // [START SDF_BadTryClaimLoop] @ProcessElement @@ -1271,6 +1276,7 @@ public class Snippets { // [END SDF_BadTryClaimLoop] } + @SuppressWarnings("unused") private static class CustomWatermarkEstimatorExample extends DoFn<String, Integer> { private static Instant currentWatermark = Instant.now(); @@ -1336,6 +1342,7 @@ public class Snippets { } // [END SDF_CustomWatermarkEstimator] + @SuppressWarnings("unused") private static class UserInitiatedCheckpointExample extends DoFn<String, Integer> { public static class ThrottlingException extends Exception {} @@ -1398,6 +1405,7 @@ public class Snippets { // [END SDF_Truncate] } + @SuppressWarnings("unused") private static class GetSizeExample extends DoFn<String, Integer> { // [START SDF_GetSize] @GetSize diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 35762bb..185b52d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -57,9 +57,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -74,7 +72,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Rule; @@ -598,22 +595,4 @@ public class PTransformMatchersTest implements Serializable { ResourceHints.create(), p); } - - private static class FakeFilenamePolicy extends FilenamePolicy { - @Override - public ResourceId windowedFilename( - int shardNumber, - int numShards, - BoundedWindow window, - PaneInfo paneInfo, - FileBasedSink.OutputFileHints outputFileHints) { - throw new UnsupportedOperationException("should not be called"); - } - - @Override - public @Nullable ResourceId unwindowedFilename( - int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) { - throw new UnsupportedOperationException("should not be called"); - } - } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java index f26c907..286d3d9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import org.joda.time.Instant; /** A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. */ @@ -37,8 +37,6 @@ class NanosOffsetClock implements Clock { @Override public Instant now() { return new Instant( - baseMillis - + TimeUnit.MILLISECONDS.convert( - System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)); + baseMillis + Duration.ofNanos(System.nanoTime() - nanosAtBaseMillis).toMillis()); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 3d96fc7..6f750d4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -39,7 +39,6 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; -import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; @@ -140,21 +139,6 @@ class TransformEvaluatorRegistry { } } - /** - * A translator just to vend the URN. This will need to be moved to runners-core-construction-java - * once SDF is reorganized appropriately. - */ - private static class SplittableParDoProcessElementsTranslator - extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?, ?>> { - - private SplittableParDoProcessElementsTranslator() {} - - @Override - public String getUrn(ProcessElements<?, ?, ?, ?, ?> transform) { - return SPLITTABLE_PROCESS_URN; - } - } - // the TransformEvaluatorFactories can construct instances of all generic types of transform, // so all instances of a primitive can be handled with the same evaluator factory. private final Map<String, TransformEvaluatorFactory> factories; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java index b222698..125c026 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveUnboundedRead; import org.apache.beam.runners.local.StructuralKey; @@ -74,7 +73,7 @@ interface UnboundedReadDeduplicator { private CachedIdDeduplicator() { ids = CacheBuilder.newBuilder() - .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS) + .expireAfterAccess(java.time.Duration.ofMillis(MAX_RETENTION_SINCE_ACCESS)) .maximumSize(100_000L) .build(new TrueBooleanLoader()); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index 6a6d7df..6d7a975 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -36,9 +36,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -191,69 +189,4 @@ public class CloningBundleFactoryTest { throw new CoderException("Decode not allowed"); } } - - private static class RecordStructuralValueCoder extends AtomicCoder<Record> { - @Override - public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} - - @Override - public Record decode(InputStream inStream) throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - - private static class RecordNotConsistentWithEqualsStructuralValueCoder - extends AtomicCoder<Record> { - @Override - public void encode(Record value, OutputStream outStream) throws CoderException, IOException {} - - @Override - public Record decode(InputStream inStream) throws CoderException, IOException { - return new Record() { - @Override - public String toString() { - return "DecodedRecord"; - } - }; - } - - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(Record value) { - return value; - } - } - - private static class IdentityDoFn extends DoFn<Record, Record> { - @ProcessElement - public void proc(ProcessContext ctxt) { - ctxt.output(ctxt.element()); - } - } - - private static class SimpleIdentity extends SimpleFunction<Record, Record> { - @Override - public Record apply(Record input) { - return input; - } - } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index b7f99b8..bcb3883 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -21,13 +21,13 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; @@ -368,7 +368,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { cache = CacheBuilder.newBuilder() .maximumSize(CACHE_MAX_SIZE) - .expireAfterAccess(CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS) + .expireAfterAccess(Duration.ofSeconds(CACHE_EXPIRE_SECONDS)) .build(); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 9379ef5..18ece76 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -1417,23 +1417,6 @@ class FlinkStreamingTransformTranslators { } } - /** - * A translator just to vend the URN. This will need to be moved to runners-core-construction-java - * once SDF is reorganized appropriately. - */ - private static class SplittableParDoProcessElementsTranslator - extends PTransformTranslation.TransformPayloadTranslator.NotSerializable< - SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?>> { - - private SplittableParDoProcessElementsTranslator() {} - - @Override - public String getUrn( - SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?> transform) { - return SPLITTABLE_PROCESS_URN; - } - } - /** Registers classes specialized to the Flink runner. */ @AutoService(TransformPayloadTranslatorRegistrar.class) public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 60abddd..4f4f6c1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1125,8 +1125,8 @@ public class DoFnOperator<InputT, OutputT> } private void buffer(KV<Integer, WindowedValue<?>> taggedValue) { + bufferLock.lock(); try { - bufferLock.lock(); pushedBackElementsHandler.pushBack(taggedValue); } catch (Exception e) { throw new RuntimeException("Couldn't pushback element.", e); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index e6c47ed..7b89e01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -553,137 +553,6 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals { } } - private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT> - extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> { - - private final StateNamespace namespace; - private final StateTag<CombiningState<InputT, AccumT, OutputT>> address; - private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn; - - FlinkKeyedCombiningState( - OperatorStateBackend flinkStateBackend, - StateTag<CombiningState<InputT, AccumT, OutputT>> address, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn, - StateNamespace namespace, - Coder<AccumT> accumCoder, - FlinkBroadcastStateInternals<K2> flinkStateInternals, - PipelineOptions pipelineOptions) { - super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions); - - this.namespace = namespace; - this.address = address; - this.combineFn = combineFn; - } - - @Override - public CombiningState<InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void add(InputT value) { - try { - AccumT current = readInternal(); - if (current == null) { - current = combineFn.createAccumulator(); - } - current = combineFn.addInput(current, value); - writeInternal(current); - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public void addAccum(AccumT accum) { - try { - AccumT current = readInternal(); - if (current == null) { - writeInternal(accum); - } else { - current = combineFn.mergeAccumulators(Arrays.asList(current, accum)); - writeInternal(current); - } - } catch (Exception e) { - throw new RuntimeException("Error adding to state.", e); - } - } - - @Override - public AccumT getAccum() { - try { - AccumT accum = readInternal(); - return accum != null ? accum : combineFn.createAccumulator(); - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(accumulators); - } - - @Override - public OutputT read() { - try { - AccumT accum = readInternal(); - if (accum != null) { - return combineFn.extractOutput(accum); - } else { - return combineFn.extractOutput(combineFn.createAccumulator()); - } - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - try { - return readInternal() == null; - } catch (Exception e) { - throw new RuntimeException("Error reading state.", e); - } - } - - @Override - public ReadableState<Boolean> readLater() { - return this; - } - }; - } - - @Override - public void clear() { - clearInternal(); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FlinkKeyedCombiningState<?, ?, ?, ?> that = (FlinkKeyedCombiningState<?, ?, ?, ?>) o; - - return namespace.equals(that.namespace) && address.equals(that.address); - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT> extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 4d27af6..af609e9 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -637,6 +637,7 @@ public class ExecutableStageDoFnOperatorTest { assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class)); } + @SuppressWarnings("LockNotBeforeTry") @Test public void testEnsureStateCleanupWithKeyedInputCleanupTimer() { InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0f23d61..212e03a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage; -import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; @@ -49,7 +48,6 @@ import java.io.PrintWriter; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -63,7 +61,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.BeamUrns; -import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.Environments; @@ -71,7 +68,6 @@ import org.apache.beam.runners.core.construction.External; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PipelineTranslation; -import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; @@ -130,7 +126,6 @@ import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.GroupedValues; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; @@ -145,7 +140,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; @@ -1783,110 +1777,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // ================================================================================ - /** - * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines into - * a Dataflow specific variant. - */ - private static class StreamingFnApiCreateOverrideFactory<T> - implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> { - - @Override - public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( - AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) { - Create.Values<T> original = transform.getTransform(); - PCollection<T> output = - (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); - return PTransformReplacement.of( - transform.getPipeline().begin(), new StreamingFnApiCreate<>(original, output)); - } - - @Override - public Map<PCollection<?>, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PCollection<?>> outputs, PCollection<T> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - } - - /** - * Specialized implementation for {@link org.apache.beam.sdk.transforms.Create.Values - * Create.Values} for the Dataflow runner in streaming mode over the Fn API. - */ - private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> { - - private final Create.Values<T> transform; - private final transient PCollection<T> originalOutput; - - private StreamingFnApiCreate(Create.Values<T> transform, PCollection<T> originalOutput) { - this.transform = transform; - this.originalOutput = originalOutput; - } - - @Override - public final PCollection<T> expand(PBegin input) { - try { - PCollection<T> pc = - Pipeline.applyTransform(input, Impulse.create()) - .apply( - ParDo.of( - DecodeAndEmitDoFn.fromIterable( - transform.getElements(), originalOutput.getCoder()))); - pc.setCoder(originalOutput.getCoder()); - return pc; - } catch (IOException e) { - throw new IllegalStateException("Unable to encode elements.", e); - } - } - - /** - * A DoFn which stores encoded versions of elements and a representation of a Coder capable of - * decoding those elements. - * - * <p>TODO: BEAM-2422 - Make this a SplittableDoFn. - */ - private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> { - - public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder) - throws IOException { - ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder(); - for (T element : elements) { - byte[] bytes = encodeToByteArray(elemCoder, element); - allElementsBytes.add(bytes); - } - return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder); - } - - private final Collection<byte[]> elements; - private final RunnerApi.MessageWithComponents coderSpec; - - // lazily initialized by parsing coderSpec - private transient Coder<T> coder; - - private Coder<T> getCoder() throws IOException { - if (coder == null) { - coder = - (Coder) - CoderTranslation.fromProto( - coderSpec.getCoder(), - RehydratedComponents.forComponents(coderSpec.getComponents()), - CoderTranslation.TranslationContext.DEFAULT); - } - return coder; - } - - private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException { - this.elements = elements; - this.coderSpec = CoderTranslation.toProto(coder); - } - - @ProcessElement - public void processElement(ProcessContext context) throws IOException { - for (byte[] element : elements) { - context.output(CoderUtils.decodeFromByteArray(getCoder(), element)); - } - } - } - } - private static class SingleOutputExpandableTransformTranslator implements TransformTranslator<External.SingleOutputExpandableTransform> { @Override diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index a9e8976..d45b3c6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -40,8 +40,6 @@ import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.JobMessage; import java.io.IOException; import java.net.SocketTimeoutException; -import java.util.List; -import java.util.NavigableMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -55,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -526,28 +523,6 @@ public class DataflowPipelineJobTest { return message; } - private class FakeMonitor extends MonitoringUtil { - // Messages in timestamp order - private final NavigableMap<Long, JobMessage> timestampedMessages; - - public FakeMonitor(JobMessage... messages) { - // The client should never be used; this Fake is intended to intercept relevant methods - super(mockDataflowClient); - - NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap(); - for (JobMessage message : messages) { - timestampedMessages.put(Long.parseLong(message.getTime()), message); - } - - this.timestampedMessages = timestampedMessages; - } - - @Override - public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) { - return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values()); - } - } - private static class ZeroSleeper implements Sleeper { @Override public void sleep(long l) throws InterruptedException {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java index 58f3abf..3329c1e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker; import java.io.IOException; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; @@ -71,7 +70,7 @@ class ReaderCache { this.invalidationExecutor = invalidationExecutor; this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS) + .expireAfterWrite(java.time.Duration.ofMillis(cacheDuration.getMillis())) .removalListener( (RemovalNotification<WindmillComputationKey, CacheEntry> notification) -> { if (notification.getCause() != RemovalCause.EXPLICIT) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java index db5256a..aeb3f62 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java @@ -20,11 +20,11 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import java.io.Closeable; +import java.time.Duration; import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryMultimapSideInputView; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.sdk.coders.Coder; @@ -71,7 +71,7 @@ class StateFetcher { server, CacheBuilder.newBuilder() .maximumWeight(100000000 /* 100 MB */) - .expireAfterWrite(1, TimeUnit.MINUTES) + .expireAfterWrite(Duration.ofMinutes(1)) .weigher((Weigher<SideInputId, SideInputCacheEntry>) (id, entry) -> entry.size()) .build()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 72fc234..5418e4f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -412,7 +412,7 @@ public class StreamingDataflowWorker { // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are // discarded by Dataflow service and calling commitCallback is best-effort. private final Cache<Long, Runnable> commitCallbacks = - CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build(); + CacheBuilder.newBuilder().expireAfterWrite(java.time.Duration.ofMinutes(5L)).build(); // Map of user state names to system state names. // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java index aef324e..ad8a071 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java @@ -153,8 +153,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation { public Consumer<Integer> processedElementsConsumer() { usingElementsProcessed = true; return elementsProcessed -> { + lock.lock(); try { - lock.lock(); this.elementsProcessed.set(elementsProcessed); condition.signal(); } finally { @@ -168,8 +168,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation { private void maybeWait() throws Exception { if (shouldWait()) { + lock.lock(); try { - lock.lock(); while (shouldWait()) { LOG.debug( "Throttling elements at {} until more than {} elements been processed.", @@ -185,8 +185,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation { public void abortWait() { usingElementsProcessed = false; + lock.lock(); try { - lock.lock(); condition.signal(); } finally { lock.unlock(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java index 4dd13c6..33dfbc2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.dataflow.worker.util.common.worker; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; @@ -49,7 +49,7 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader { this.cache = CacheBuilder.newBuilder() .maximumSize(maximumBatches) - .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS) + .expireAfterAccess(Duration.ofMillis(expireAfterAccessMillis)) .<BatchRange, Batch>build( new CacheLoader<BatchRange, Batch>() { @Override diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index b9851ca..6a3f9a7 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.control; import com.google.auto.value.AutoValue; import java.io.IOException; +import java.time.Duration; import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; @@ -26,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -204,11 +204,11 @@ public class DefaultJobBundleFactory implements JobBundleFactory { notification -> { WrappedSdkHarnessClient client = notification.getValue(); final int refCount; + // We need to use a lock here to ensure we are not causing the environment to + // be removed if beforehand a StageBundleFactory has retrieved it but not yet + // issued ref() on it. + refLock.lock(); try { - // We need to use a lock here to ensure we are not causing the environment to - // be removed if beforehand a StageBundleFactory has retrieved it but not yet - // issued ref() on it. - refLock.lock(); refCount = client.unref(); } finally { refLock.unlock(); @@ -223,7 +223,7 @@ public class DefaultJobBundleFactory implements JobBundleFactory { }); if (environmentExpirationMillis > 0) { - cacheBuilder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS); + cacheBuilder.expireAfterWrite(Duration.ofMillis(environmentExpirationMillis)); } LoadingCache<Environment, WrappedSdkHarnessClient> cache = @@ -474,8 +474,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory { currentCache = availableCaches.take(); // Lock because the environment expiration can remove the ref for the client // which would close the underlying environment before we can ref it. + currentCache.lock.lock(); try { - currentCache.lock.lock(); client = currentCache.cache.getUnchecked(executableStage.getEnvironment()); client.ref(); } finally { @@ -494,8 +494,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory { currentCache = environmentCaches.get(environmentIndex); // Lock because the environment expiration can remove the ref for the client which would // close the underlying environment before we can ref it. + currentCache.lock.lock(); try { - currentCache.lock.lock(); client = currentCache.cache.getUnchecked(executableStage.getEnvironment()); client.ref(); } finally { diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 7248b4e..62f78f2 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -799,11 +799,9 @@ public class RemoteExecutionTest implements Serializable { stateDelegator); Map<String, Coder> remoteOutputCoders = descriptor.getRemoteOutputCoders(); - Map<String, Collection<WindowedValue<?>>> outputValues = new HashMap<>(); Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>(); for (Entry<String, Coder> remoteOutputCoder : remoteOutputCoders.entrySet()) { List<WindowedValue<?>> outputContents = Collections.synchronizedList(new ArrayList<>()); - outputValues.put(remoteOutputCoder.getKey(), outputContents); outputReceivers.put( remoteOutputCoder.getKey(), RemoteOutputReceiver.of( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 685c1a7..d07416e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -88,7 +88,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo LOG.info("Creating reader cache. Cache interval = {} ms.", readerCacheInterval); readerCache = CacheBuilder.newBuilder() - .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) + .expireAfterAccess(java.time.Duration.ofMillis(readerCacheInterval)) .removalListener(new ReaderCacheRemovalListener()) .build(); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java index acf4e05..dadc18e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.utils; +import java.time.Duration; import java.util.Objects; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; @@ -36,7 +36,7 @@ class SideInputStorage { /** JVM deserialized side input cache. */ private static final Cache<Key<?>, Value<?>> materializedSideInputs = - CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build(); + CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build(); static Cache<Key<?>, Value<?>> getMaterializedSideInputs() { return materializedSideInputs; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 5468a27..f7c1eab 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -20,12 +20,12 @@ package org.apache.beam.runners.spark.util; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import java.io.Serializable; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; @@ -137,7 +137,7 @@ public class GlobalWatermarkHolder { createWatermarkCache(final Long batchDuration) { return CacheBuilder.newBuilder() // expire watermarks every half batch duration to ensure they update in every batch. - .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS) + .expireAfterWrite(Duration.ofMillis(batchDuration / 2)) .build(new WatermarksLoader()); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java index 08fefe2..da796b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.spark.util; +import java.time.Duration; import java.util.Objects; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; @@ -36,7 +36,7 @@ class SideInputStorage { /** JVM deserialized side input cache. */ private static final Cache<Key<?>, Value<?>> materializedSideInputs = - CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build(); + CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build(); static Cache<Key<?>, Value<?>> getMaterializedSideInputs() { return materializedSideInputs; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 52e87d5..0be7626 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -28,7 +28,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.InstantCoder; @@ -471,7 +470,7 @@ public class Read { restrictionCoder = restrictionCoder(); cachedReaders = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.MINUTES) + .expireAfterWrite(java.time.Duration.ofMinutes(1)) .maximumSize(100) .removalListener( (RemovalListener<Object, UnboundedReader>) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java index db64718..959c863 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java @@ -219,26 +219,6 @@ public class SchemaCoder<T> extends CustomCoder<T> { return Objects.hash(schema, typeDescriptor, toRowFunction, fromRowFunction); } - private static class RowIdentity implements SerializableFunction<Row, Row> { - @Override - public Row apply(Row input) { - return input; - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - return o != null && getClass() == o.getClass(); - } - } - @Override public TypeDescriptor<T> getEncodedTypeDescriptor() { return this.typeDescriptor; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java index 360c1af..ca2b85e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -680,65 +680,6 @@ public class PCollectionViews { public ListIterator<T> listIterator() { return super.listIterator(); } - - /** A {@link ListIterator} over {@link MultimapView} adapter. */ - private class ListIteratorOverMultimapView implements ListIterator<T> { - private int position; - - @Override - public boolean hasNext() { - return position < size(); - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - T rval = get(position); - position += 1; - return rval; - } - - @Override - public boolean hasPrevious() { - return position > 0; - } - - @Override - public T previous() { - if (!hasPrevious()) { - throw new NoSuchElementException(); - } - position -= 1; - return get(position); - } - - @Override - public int nextIndex() { - return position; - } - - @Override - public int previousIndex() { - return position - 1; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void set(T e) { - throw new UnsupportedOperationException(); - } - - @Override - public void add(T e) { - throw new UnsupportedOperationException(); - } - } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 954eb7a..cd50db4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -62,10 +62,6 @@ public class CoderRegistryTest { @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(CoderRegistry.class); - private static class SerializableClass implements Serializable {} - - private static class NotSerializableClass {} - @Test public void testRegisterInstantiatedCoder() throws Exception { CoderRegistry registry = CoderRegistry.createDefault(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java index 896ca69..14dffa2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java @@ -21,13 +21,13 @@ import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.logging.LogRecord; import org.hamcrest.TypeSafeMatcher; import org.junit.Rule; @@ -143,8 +143,7 @@ public class ExpectedLogsTest { public void testThreadSafetyOfLogSaver() throws Throwable { CompletionService<Void> completionService = new ExecutorCompletionService<>(Executors.newCachedThreadPool()); - final long scheduledLogTime = - TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS) + 500L; + final long scheduledLogTime = Duration.ofNanos(System.nanoTime()).toMillis() + 500L; List<String> expectedStrings = new ArrayList<>(); for (int i = 0; i < 100; i++) { @@ -154,10 +153,7 @@ public class ExpectedLogsTest { () -> { // Have all threads started and waiting to log at about the same moment. sleepMillis( - Math.max( - 1, - scheduledLogTime - - TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS))); + Math.max(1, scheduledLogTime - Duration.ofNanos(System.nanoTime()).toMillis())); LOG.trace(expected); return null; }); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java index 47a28ef..fd716b9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.testing; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import java.util.concurrent.locks.LockSupport; import org.apache.beam.sdk.util.Sleeper; @@ -41,7 +41,7 @@ public class SystemNanoTimeSleeper implements Sleeper { @Override public void sleep(long millis) throws InterruptedException { long currentTime; - long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); + long endTime = System.nanoTime() + Duration.ofMillis(millis).toNanos(); while ((currentTime = System.nanoTime()) < endTime) { if (Thread.interrupted()) { throw new InterruptedException(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 90e0106..9166a33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -1602,15 +1602,6 @@ public class DoFnSignaturesTest { ProcessContext context, @StateId(STATE_ID) ValueState<String> state); } - private abstract static class DoFnDeclaringMyTimerId extends DoFn<KV<String, Integer>, Long> { - - @TimerId("my-timer-id") - private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void foo(ProcessContext context) {} - } - private abstract static class DoFnDeclaringTimerAndCallback extends DoFn<KV<String, Integer>, Long> { public static final String TIMER_ID = "my-timer-id"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java index 084cb5c..eb7d8ca 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java @@ -183,7 +183,7 @@ public class GrowableOffsetRangeTrackerTest { tracker.checkDone(); simpleEstimator.setEstimateRangeEnd(0L); Progress currentProgress = tracker.getProgress(); - assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0.001); + assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0); assertEquals(0, currentProgress.getWorkRemaining(), 0.001); } diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java index 3ed5083..999ece6 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java @@ -86,14 +86,6 @@ public class TimestampExtractTransform<InputT, OutputT> } } - private static class Unwrap<T> extends DoFn<KV<Long, T>, T> { - - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().getValue()); - } - } - private final PCollectionTransform<InputT, OutputT> timestampedTransform; private TimestampExtractTransform( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 7d9d61b..a333ba9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import static org.apache.beam.vendor.calcite.v1_28_0.com.google.common.base.Preconditions.checkArgument; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -222,7 +223,7 @@ public abstract class BeamKafkaTable extends SchemaBaseBeamTable { throw new NoEstimationException("There is no partition with messages in it."); } - ConsumerRecords<T, T> records = consumer.poll(1000); + ConsumerRecords<T, T> records = consumer.poll(Duration.ofSeconds(1)); // Kafka guarantees the delivery of messages in order they arrive to each partition. // Therefore the first message seen from each partition is the first message arrived to that. diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java index e58f09b..3d1cded 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java @@ -112,8 +112,8 @@ public class CancellableQueue<T extends @NonNull Object> { * queue clears the exception. */ public void cancel(Exception exception) { + lock.lock(); try { - lock.lock(); cancellationException = exception; notEmpty.signalAll(); notFull.signalAll(); @@ -124,8 +124,8 @@ public class CancellableQueue<T extends @NonNull Object> { /** Enables the queue to be re-used after it has been cancelled. */ public void reset() { + lock.lock(); try { - lock.lock(); cancellationException = null; addIndex = 0; takeIndex = 0; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 62a6180..a9636ff 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -17,12 +17,12 @@ */ package org.apache.beam.fn.harness; +import java.time.Duration; import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -253,7 +253,7 @@ public class FnHarness { LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors = CacheBuilder.newBuilder() .maximumSize(1000) - .expireAfterAccess(10, TimeUnit.MINUTES) + .expireAfterAccess(Duration.ofMinutes(10)) .build( new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>() { @Override diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java index ae897cb..c9ab166 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java @@ -272,8 +272,8 @@ public class StateFetchingIteratorsTest { } assertFalse(valuesIter2.hasNext()); assertTrue(valuesIter2.isReady()); - // The contents agree. + assertArrayEquals(expected, Iterables.toArray(results, Object.class)); assertArrayEquals(expected, Iterables.toArray(values, Object.class)); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 21c7a46..c214fda 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -83,7 +82,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private static final Cache<String, StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) + .expireAfterAccess(java.time.Duration.ofMinutes(5)) .removalListener( (RemovalNotification<String, StreamAppendClient> removal) -> { @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 3d6e33c..ede5129 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -102,7 +101,7 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT> private static final Cache<String, StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) + .expireAfterAccess(java.time.Duration.ofMinutes(5)) .removalListener( (RemovalNotification<String, StreamAppendClient> removal) -> { @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java index 8108d09..cd2adcc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java @@ -24,7 +24,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; import com.google.errorprone.annotations.concurrent.GuardedBy; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import javax.annotation.Nullable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; @@ -46,8 +46,8 @@ final class LimitingTopicBacklogReader implements TopicBacklogReader { CacheBuilder.newBuilder() .ticker(ticker) .maximumSize(1) - .expireAfterWrite(1, TimeUnit.MINUTES) - .refreshAfterWrite(10, TimeUnit.SECONDS) + .expireAfterWrite(Duration.ofMinutes(1)) + .refreshAfterWrite(Duration.ofSeconds(10)) .build( new CacheLoader<String, ComputeMessageStatsResponse>() { @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index faff06e..e34863e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -33,13 +33,7 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.PartialResultSet; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.MethodDescriptor; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.ReleaseInfo; @@ -206,22 +200,4 @@ public class SpannerAccessor implements AutoCloseable { } } } - - private static class CommitDeadlineSettingInterceptor implements ClientInterceptor { - private final long commitDeadlineMilliseconds; - - private CommitDeadlineSettingInterceptor(Duration commitDeadline) { - this.commitDeadlineMilliseconds = commitDeadline.getMillis(); - } - - @Override - public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( - MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { - if (method.getFullMethodName().equals("google.spanner.v1.Spanner/Commit")) { - callOptions = - callOptions.withDeadlineAfter(commitDeadlineMilliseconds, TimeUnit.MILLISECONDS); - } - return next.newCall(method, callOptions); - } - } } diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java index f4e3677..2d10bdb 100644 --- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java +++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java @@ -26,7 +26,6 @@ import java.sql.SQLException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.common.TestRow; -import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -80,13 +79,4 @@ class TestRowDBWritable extends TestRow implements DBWritable, Writable { id = in.readInt(); name = in.readUTF(); } - - private static class PrepareStatementFromTestRow - implements JdbcIO.PreparedStatementSetter<TestRow> { - @Override - public void setParameters(TestRow element, PreparedStatement statement) throws SQLException { - statement.setLong(1, element.id()); - statement.setString(2, element.name()); - } - } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index 75c8270..8ad0aff 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -24,6 +24,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -641,7 +642,7 @@ class KafkaExactlyOnceSink<K, V> ShardWriterCache() { this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .expireAfterWrite(Duration.ofMillis(IDLE_TIMEOUT_MS)) .<Integer, ShardWriter<K, V>>removalListener( notification -> { if (notification.getCause() != RemovalCause.EXPLICIT) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index aaeb1b4..de70ac0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -329,7 +329,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { * good to experiment. Often multiple marks would be finalized in a batch, it it reduce * finalization overhead to wait a short while and finalize only the last checkpoint mark. */ - private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); + private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1); private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10); private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100); @@ -520,7 +520,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { while (!closed.get()) { try { if (records.isEmpty()) { - records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); + records = consumer.poll(KAFKA_POLL_TIMEOUT); } else if (availableRecordsQueue.offer( records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { records = ConsumerRecords.empty(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index b7f8e7a..7ce49a9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -58,7 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; -import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,7 +176,8 @@ class ReadFromKafkaDoFn<K, V> private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize; - private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); + private static final org.joda.time.Duration KAFKA_POLL_TIMEOUT = + org.joda.time.Duration.millis(1000); @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @@ -290,6 +290,7 @@ class ReadFromKafkaDoFn<K, V> return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller); } + @SuppressWarnings("PreferJavaTimeOverload") @ProcessElement public ProcessContinuation processElement( @Element KafkaSourceDescriptor kafkaSourceDescriptor, diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java index d45942e..98af830 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java @@ -540,10 +540,8 @@ public class XmlSourceTest { exception.expectMessage("MyCustomValidationEventHandler failure mesage"); try (Reader<WrongTrainType> reader = source.createReader(null)) { - List<WrongTrainType> results = new ArrayList<>(); for (boolean available = reader.start(); available; available = reader.advance()) { - WrongTrainType train = reader.getCurrent(); - results.add(train); + reader.getCurrent(); } } }