Repository: beam
Updated Branches:
  refs/heads/master 732411a4c -> e8865843b


Utility class to convert PCollection<T> to PCollection<String>


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fc92be81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fc92be81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fc92be81

Branch: refs/heads/master
Commit: fc92be818ef345965873413de4682fecc84d492b
Parents: 732411a
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Tue Dec 27 16:37:03 2016 -0800
Committer: Vikas Kedigehalli <vika...@google.com>
Committed: Thu Dec 29 18:19:23 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ToString.java    | 68 ++++++++++++++++++++
 .../beam/sdk/transforms/ToStringTest.java       | 59 +++++++++++++++++
 2 files changed, 127 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fc92be81/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
new file mode 100644
index 0000000..dfaad94
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform PTransforms} for converting a {@link PCollection 
PCollection&lt;T&gt;} to a
+ * {@link PCollection PCollection&lt;String&gt;}.
+ *
+ * <p>Example of use:
+ * <pre> {@code
+ * PCollection<Long> longs = ...;
+ * PCollection<String> strings = longs.apply(ToString.<Long>element());
+ * } </pre>
+ *
+ *
+ * <p><b>Note</b>: For any custom string conversion and formatting, we 
recommend applying your own
+ * {@link SerializableFunction} using {@link 
MapElements#via(SerializableFunction)}
+ */
+public final class ToString {
+
+  /**
+   * Returns a {@code PTransform<PCollection<T>, PCollection<String>>} which 
transforms each
+   * element of the input {@link PCollection} to a {@link String} using the
+   * {@link Object#toString} method.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<String>> element() {
+    return new Default<>();
+  }
+
+  private ToString() {
+  }
+
+  /**
+   * A {@link PTransform} that converts a {@code PCollection<T>} to a {@code 
PCollection<String>}
+   * using the {@link  Object#toString} method.
+   */
+  private static final class Default<T> extends PTransform<PCollection<T>, 
PCollection<String>> {
+    @Override
+    public PCollection<String> expand(PCollection<T> input) {
+      return input.apply(MapElements.via(new ToStringFunction<T>()));
+    }
+
+    private static class ToStringFunction<T> extends SimpleFunction<T, String> 
{
+      @Override
+      public String apply(T input) {
+        return input.toString();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fc92be81/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
new file mode 100644
index 0000000..e5c9f05
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ToString} transform.
+ */
+@RunWith(JUnit4.class)
+public class ToStringTest {
+  @Rule
+  public final TestPipeline p = TestPipeline.create();
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testToStringElement() {
+    Integer[] ints = {1, 2, 3, 4, 5};
+    PCollection<Integer> input = p.apply(Create.of(Arrays.asList(ints)));
+    PCollection<String> output = input.apply(ToString.<Integer>element());
+    PAssert.that(output).containsInAnyOrder(toStringList(ints));
+    p.run();
+  }
+
+  private List<String> toStringList(Object[] ints) {
+    List<String> ll = new ArrayList<>(ints.length);
+    for (Object i : ints) {
+      ll.add(i.toString());
+    }
+    return ll;
+  }
+}

Reply via email to