Repository: beam Updated Branches: refs/heads/master 732411a4c -> e8865843b
Utility class to convert PCollection<T> to PCollection<String> Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc92be81 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc92be81 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc92be81 Branch: refs/heads/master Commit: fc92be818ef345965873413de4682fecc84d492b Parents: 732411a Author: Vikas Kedigehalli <vika...@google.com> Authored: Tue Dec 27 16:37:03 2016 -0800 Committer: Vikas Kedigehalli <vika...@google.com> Committed: Thu Dec 29 18:19:23 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ToString.java | 68 ++++++++++++++++++++ .../beam/sdk/transforms/ToStringTest.java | 59 +++++++++++++++++ 2 files changed, 127 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fc92be81/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java new file mode 100644 index 0000000..dfaad94 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<T>} to a + * {@link PCollection PCollection<String>}. + * + * <p>Example of use: + * <pre> {@code + * PCollection<Long> longs = ...; + * PCollection<String> strings = longs.apply(ToString.<Long>element()); + * } </pre> + * + * + * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own + * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + */ +public final class ToString { + + /** + * Returns a {@code PTransform<PCollection<T>, PCollection<String>>} which transforms each + * element of the input {@link PCollection} to a {@link String} using the + * {@link Object#toString} method. + */ + public static <T> PTransform<PCollection<T>, PCollection<String>> element() { + return new Default<>(); + } + + private ToString() { + } + + /** + * A {@link PTransform} that converts a {@code PCollection<T>} to a {@code PCollection<String>} + * using the {@link Object#toString} method. + */ + private static final class Default<T> extends PTransform<PCollection<T>, PCollection<String>> { + @Override + public PCollection<String> expand(PCollection<T> input) { + return input.apply(MapElements.via(new ToStringFunction<T>())); + } + + private static class ToStringFunction<T> extends SimpleFunction<T, String> { + @Override + public String apply(T input) { + return input.toString(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fc92be81/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java new file mode 100644 index 0000000..e5c9f05 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ToString} transform. + */ +@RunWith(JUnit4.class) +public class ToStringTest { + @Rule + public final TestPipeline p = TestPipeline.create(); + + @Test + @Category(RunnableOnService.class) + public void testToStringElement() { + Integer[] ints = {1, 2, 3, 4, 5}; + PCollection<Integer> input = p.apply(Create.of(Arrays.asList(ints))); + PCollection<String> output = input.apply(ToString.<Integer>element()); + PAssert.that(output).containsInAnyOrder(toStringList(ints)); + p.run(); + } + + private List<String> toStringList(Object[] ints) { + List<String> ll = new ArrayList<>(ints.length); + for (Object i : ints) { + ll.add(i.toString()); + } + return ll; + } +}