Remove OldDoFn to DoFn in DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/28720191 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/28720191 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/28720191 Branch: refs/heads/apex-runner Commit: 287201916776cb51b98b9ddd27c169f87bb89e1a Parents: 3d08685 Author: Kenneth Knowles <k...@google.com> Authored: Tue Oct 25 11:18:10 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Oct 25 13:12:17 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/WriteWithShardingFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/28720191/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index d74cd56..8727cb5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -25,9 +25,9 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.io.Write.Bound; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; @@ -102,7 +102,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { } @VisibleForTesting - static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> { + static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> { @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3; @@ -116,7 +116,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { this.randomExtraShards = extraShards; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { if (maxShards == 0) { maxShards = calculateShards(c.sideInput(numRecords));