http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java index 1525c80..9f602fd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java @@ -25,103 +25,103 @@ import java.util.concurrent.TimeUnit; public class StateCheckpointWriter { - private final AbstractStateBackend.CheckpointStateOutputView output; - - public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) { - return new StateCheckpointWriter(output); - } - - private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) { - this.output = output; - } - - ///////// Creating the serialized versions of the different types of state held by dataflow /////// - - public StateCheckpointWriter addValueBuilder() throws IOException { - validate(); - StateType.serialize(StateType.VALUE, this); - return this; - } - - public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException { - validate(); - StateType.serialize(StateType.WATERMARK, this); - return this; - } - - public StateCheckpointWriter addListUpdatesBuilder() throws IOException { - validate(); - StateType.serialize(StateType.LIST, this); - return this; - } - - public StateCheckpointWriter addAccumulatorBuilder() throws IOException { - validate(); - StateType.serialize(StateType.ACCUMULATOR, this); - return this; - } - - ///////// Setting the tag for a given state element /////// - - public StateCheckpointWriter setTag(ByteString stateKey) throws IOException { - return writeData(stateKey.toByteArray()); - } - - public StateCheckpointWriter setTag(String stateKey) throws IOException { - output.writeUTF(stateKey); - return this; - } - - - public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException { - return serializeObject(key, keySerializer); - } - - public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException { - objectSerializer.serialize(object, output); - return this; - } - - ///////// Write the actual serialized data ////////// - - public StateCheckpointWriter setData(ByteString data) throws IOException { - return writeData(data.toByteArray()); - } - - public StateCheckpointWriter setData(byte[] data) throws IOException { - return writeData(data); - } - - public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException { - validate(); - output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis())); - return this; - } - - public StateCheckpointWriter writeInt(int number) throws IOException { - validate(); - output.writeInt(number); - return this; - } - - public StateCheckpointWriter writeByte(byte b) throws IOException { - validate(); - output.writeByte(b); - return this; - } - - ///////// Helper Methods /////// - - private StateCheckpointWriter writeData(byte[] data) throws IOException { - validate(); - output.writeInt(data.length); - output.write(data); - return this; - } - - private void validate() { - if (this.output == null) { - throw new RuntimeException("StateBackend not initialized yet."); - } - } + private final AbstractStateBackend.CheckpointStateOutputView output; + + public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) { + return new StateCheckpointWriter(output); + } + + private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) { + this.output = output; + } + + ///////// Creating the serialized versions of the different types of state held by dataflow /////// + + public StateCheckpointWriter addValueBuilder() throws IOException { + validate(); + StateType.serialize(StateType.VALUE, this); + return this; + } + + public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException { + validate(); + StateType.serialize(StateType.WATERMARK, this); + return this; + } + + public StateCheckpointWriter addListUpdatesBuilder() throws IOException { + validate(); + StateType.serialize(StateType.LIST, this); + return this; + } + + public StateCheckpointWriter addAccumulatorBuilder() throws IOException { + validate(); + StateType.serialize(StateType.ACCUMULATOR, this); + return this; + } + + ///////// Setting the tag for a given state element /////// + + public StateCheckpointWriter setTag(ByteString stateKey) throws IOException { + return writeData(stateKey.toByteArray()); + } + + public StateCheckpointWriter setTag(String stateKey) throws IOException { + output.writeUTF(stateKey); + return this; + } + + + public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException { + return serializeObject(key, keySerializer); + } + + public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException { + objectSerializer.serialize(object, output); + return this; + } + + ///////// Write the actual serialized data ////////// + + public StateCheckpointWriter setData(ByteString data) throws IOException { + return writeData(data.toByteArray()); + } + + public StateCheckpointWriter setData(byte[] data) throws IOException { + return writeData(data); + } + + public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException { + validate(); + output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis())); + return this; + } + + public StateCheckpointWriter writeInt(int number) throws IOException { + validate(); + output.writeInt(number); + return this; + } + + public StateCheckpointWriter writeByte(byte b) throws IOException { + validate(); + output.writeByte(b); + return this; + } + + ///////// Helper Methods /////// + + private StateCheckpointWriter writeData(byte[] data) throws IOException { + validate(); + output.writeInt(data.length); + output.write(data); + return this; + } + + private void validate() { + if (this.output == null) { + throw new RuntimeException("StateBackend not initialized yet."); + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java index aa049ef..9e2c9f8 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java @@ -23,49 +23,49 @@ import java.io.IOException; * */ public enum StateType { - VALUE(0), + VALUE(0), - WATERMARK(1), + WATERMARK(1), - LIST(2), + LIST(2), - ACCUMULATOR(3); + ACCUMULATOR(3); - private final int numVal; + private final int numVal; - StateType(int value) { - this.numVal = value; - } + StateType(int value) { + this.numVal = value; + } - public static void serialize(StateType type, StateCheckpointWriter output) throws IOException { - if (output == null) { - throw new IllegalArgumentException("Cannot write to a null output."); - } + public static void serialize(StateType type, StateCheckpointWriter output) throws IOException { + if (output == null) { + throw new IllegalArgumentException("Cannot write to a null output."); + } - if(type.numVal < 0 || type.numVal > 3) { - throw new RuntimeException("Unknown State Type " + type + "."); - } + if(type.numVal < 0 || type.numVal > 3) { + throw new RuntimeException("Unknown State Type " + type + "."); + } - output.writeByte((byte) type.numVal); - } + output.writeByte((byte) type.numVal); + } - public static StateType deserialize(StateCheckpointReader input) throws IOException { - if (input == null) { - throw new IllegalArgumentException("Cannot read from a null input."); - } + public static StateType deserialize(StateCheckpointReader input) throws IOException { + if (input == null) { + throw new IllegalArgumentException("Cannot read from a null input."); + } - int typeInt = (int) input.getByte(); - if(typeInt < 0 || typeInt > 3) { - throw new RuntimeException("Unknown State Type " + typeInt + "."); - } + int typeInt = (int) input.getByte(); + if(typeInt < 0 || typeInt > 3) { + throw new RuntimeException("Unknown State Type " + typeInt + "."); + } - StateType resultType = null; - for(StateType st: values()) { - if(st.numVal == typeInt) { - resultType = st; - break; - } - } - return resultType; - } + StateType resultType = null; + for(StateType st: values()) { + if(st.numVal == typeInt) { + resultType = st; + break; + } + } + return resultType; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java index ce53d44..3272975 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java @@ -30,70 +30,70 @@ import org.apache.flink.test.util.JavaProgramTestBase; public class AvroITCase extends JavaProgramTestBase { - protected String resultPath; - protected String tmpPath; - - public AvroITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "Joe red 3", - "Mary blue 4", - "Mark green 1", - "Julia purple 5" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - tmpPath = getTempDirPath("tmp"); - - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(tmpPath, resultPath); - } - - private static void runProgram(String tmpPath, String resultPath) { - Pipeline p = FlinkTestPipeline.createForBatch(); - - p - .apply(Create.of( - new User("Joe", 3, "red"), - new User("Mary", 4, "blue"), - new User("Mark", 1, "green"), - new User("Julia", 5, "purple")) - .withCoder(AvroCoder.of(User.class))) - - .apply(AvroIO.Write.to(tmpPath) - .withSchema(User.class)); - - p.run(); - - p = FlinkTestPipeline.createForBatch(); - - p - .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation()) - - .apply(ParDo.of(new DoFn<User, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - User u = c.element(); - String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber(); - c.output(result); - } - })) - - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } + protected String resultPath; + protected String tmpPath; + + public AvroITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "Joe red 3", + "Mary blue 4", + "Mark green 1", + "Julia purple 5" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + tmpPath = getTempDirPath("tmp"); + + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(tmpPath, resultPath); + } + + private static void runProgram(String tmpPath, String resultPath) { + Pipeline p = FlinkTestPipeline.createForBatch(); + + p + .apply(Create.of( + new User("Joe", 3, "red"), + new User("Mary", 4, "blue"), + new User("Mark", 1, "green"), + new User("Julia", 5, "purple")) + .withCoder(AvroCoder.of(User.class))) + + .apply(AvroIO.Write.to(tmpPath) + .withSchema(User.class)); + + p.run(); + + p = FlinkTestPipeline.createForBatch(); + + p + .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation()) + + .apply(ParDo.of(new DoFn<User, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + User u = c.element(); + String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber(); + c.output(result); + } + })) + + .apply(TextIO.Write.to(resultPath)); + + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java index 928388c..e65e497 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java @@ -26,47 +26,47 @@ import org.apache.flink.test.util.JavaProgramTestBase; public class FlattenizeITCase extends JavaProgramTestBase { - private String resultPath; - private String resultPath2; + private String resultPath; + private String resultPath2; - private static final String[] words = {"hello", "this", "is", "a", "DataSet!"}; - private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"}; - private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"}; + private static final String[] words = {"hello", "this", "is", "a", "DataSet!"}; + private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"}; + private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - resultPath2 = getTempDirPath("result2"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + resultPath2 = getTempDirPath("result2"); + } - @Override - protected void postSubmit() throws Exception { - String join = Joiner.on('\n').join(words); - String join2 = Joiner.on('\n').join(words2); - String join3 = Joiner.on('\n').join(words3); - compareResultsByLinesInMemory(join + "\n" + join2, resultPath); - compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2); - } + @Override + protected void postSubmit() throws Exception { + String join = Joiner.on('\n').join(words); + String join2 = Joiner.on('\n').join(words2); + String join3 = Joiner.on('\n').join(words3); + compareResultsByLinesInMemory(join + "\n" + join2, resultPath); + compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2); + } - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); - PCollection<String> p1 = p.apply(Create.of(words)); - PCollection<String> p2 = p.apply(Create.of(words2)); + PCollection<String> p1 = p.apply(Create.of(words)); + PCollection<String> p2 = p.apply(Create.of(words2)); - PCollectionList<String> list = PCollectionList.of(p1).and(p2); + PCollectionList<String> list = PCollectionList.of(p1).and(p2); - list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath)); + list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath)); - PCollection<String> p3 = p.apply(Create.of(words3)); + PCollection<String> p3 = p.apply(Create.of(words3)); - PCollectionList<String> list2 = list.and(p3); + PCollectionList<String> list2 = list.and(p3); - list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2)); + list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2)); - p.run(); - } + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java index 59c3b69..578e0e1 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java @@ -26,45 +26,45 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner; */ public class FlinkTestPipeline extends Pipeline { - /** - * Creates and returns a new test pipeline for batch execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - */ - public static FlinkTestPipeline createForBatch() { - return create(false); - } + /** + * Creates and returns a new test pipeline for batch execution. + * + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + */ + public static FlinkTestPipeline createForBatch() { + return create(false); + } - /** - * Creates and returns a new test pipeline for streaming execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @return The Test Pipeline - */ - public static FlinkTestPipeline createForStreaming() { - return create(true); - } + /** + * Creates and returns a new test pipeline for streaming execution. + * + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @return The Test Pipeline + */ + public static FlinkTestPipeline createForStreaming() { + return create(true); + } - /** - * Creates and returns a new test pipeline for streaming or batch execution. - * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call - * {@link Pipeline#run} to execute the pipeline and check the tests. - * - * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. - * @return The Test Pipeline. - */ - private static FlinkTestPipeline create(boolean streaming) { - FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); - return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); - } + /** + * Creates and returns a new test pipeline for streaming or batch execution. + * + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * {@link Pipeline#run} to execute the pipeline and check the tests. + * + * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. + * @return The Test Pipeline. + */ + private static FlinkTestPipeline create(boolean streaming) { + FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); + return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); + } - private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, - PipelineOptions options) { - super(runner, options); - } + private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, + PipelineOptions options) { + super(runner, options); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java index af0f217..28861ea 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java @@ -34,66 +34,66 @@ import java.util.List; */ public class JoinExamplesITCase extends JavaProgramTestBase { - protected String resultPath; - - public JoinExamplesITCase(){ - } - - private static final TableRow row1 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com"); - private static final TableRow row2 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com"); - private static final TableRow row3 = new TableRow() - .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213") - .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com"); - static final TableRow[] EVENTS = new TableRow[] { - row1, row2, row3 - }; - static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); - - private static final TableRow cc1 = new TableRow() - .set("FIPSCC", "VM").set("HumanName", "Vietnam"); - private static final TableRow cc2 = new TableRow() - .set("FIPSCC", "BE").set("HumanName", "Belgium"); - static final TableRow[] CCS = new TableRow[] { - cc1, cc2 - }; - static final List<TableRow> CC_ARRAY = Arrays.asList(CCS); - - static final String[] JOINED_EVENTS = new String[] { - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " - + "url: http://www.chicagotribune.com", - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " - + "url: http://cnn.com", - "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " - + "url: http://cnn.com" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY)); - PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY)); - - PCollection<String> output = JoinExamples.joinEvents(input1, input2); - - output.apply(TextIO.Write.to(resultPath)); - - p.run(); - } + protected String resultPath; + + public JoinExamplesITCase(){ + } + + private static final TableRow row1 = new TableRow() + .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") + .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com"); + private static final TableRow row2 = new TableRow() + .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") + .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com"); + private static final TableRow row3 = new TableRow() + .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213") + .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com"); + static final TableRow[] EVENTS = new TableRow[] { + row1, row2, row3 + }; + static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); + + private static final TableRow cc1 = new TableRow() + .set("FIPSCC", "VM").set("HumanName", "Vietnam"); + private static final TableRow cc2 = new TableRow() + .set("FIPSCC", "BE").set("HumanName", "Belgium"); + static final TableRow[] CCS = new TableRow[] { + cc1, cc2 + }; + static final List<TableRow> CC_ARRAY = Arrays.asList(CCS); + + static final String[] JOINED_EVENTS = new String[] { + "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " + + "url: http://www.chicagotribune.com", + "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " + + "url: http://cnn.com", + "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " + + "url: http://cnn.com" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY)); + PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY)); + + PCollection<String> output = JoinExamples.joinEvents(input1, input2); + + output.apply(TextIO.Write.to(resultPath)); + + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java index 35f2eaf..d1652e7 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java @@ -27,37 +27,37 @@ import java.io.Serializable; public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable { - protected String resultPath; - - protected final String expected = "test"; - - public MaybeEmptyTestITCase() { - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of( - new DoFn<Void, String>() { - @Override - public void processElement(DoFn<Void, String>.ProcessContext c) { - c.output(expected); - } - })).apply(TextIO.Write.to(resultPath)); - p.run(); - } + protected String resultPath; + + protected final String expected = "test"; + + public MaybeEmptyTestITCase() { + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) + .apply(ParDo.of( + new DoFn<Void, String>() { + @Override + public void processElement(DoFn<Void, String>.ProcessContext c) { + c.output(expected); + } + })).apply(TextIO.Write.to(resultPath)); + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java index ccdbbf9..d8087d6 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java @@ -31,68 +31,68 @@ import java.io.Serializable; public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable { - private String resultPath; - - private static String[] expectedWords = {"MAAA", "MAAFOOO"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); - - // Select words whose length is below a cut off, - // plus the lengths of words that are above the cut off. - // Also select words starting with "MARKER". - final int wordLengthCutOff = 3; - // Create tags to use for the main and side outputs. - final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; - final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; - final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; - - PCollectionTuple results = - words.apply(ParDo - .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) - .and(markedWordsTag)) - .of(new DoFn<String, String>() { - final TupleTag<String> specialWordsTag = new TupleTag<String>() { - }; - - public void processElement(ProcessContext c) { - String word = c.element(); - if (word.length() <= wordLengthCutOff) { - c.output(word); - } else { - c.sideOutput(wordLengthsAboveCutOffTag, word.length()); - } - if (word.startsWith("MAA")) { - c.sideOutput(markedWordsTag, word); - } - - if (word.startsWith("SPECIAL")) { - c.sideOutput(specialWordsTag, word); - } - } - })); - - // Extract the PCollection results, by tag. - PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); - PCollection<Integer> wordLengthsAboveCutOff = results.get - (wordLengthsAboveCutOffTag); - PCollection<String> markedWords = results.get(markedWordsTag); - - markedWords.apply(TextIO.Write.to(resultPath)); - - p.run(); - } + private String resultPath; + + private static String[] expectedWords = {"MAAA", "MAAFOOO"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath); + } + + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); + + // Select words whose length is below a cut off, + // plus the lengths of words that are above the cut off. + // Also select words starting with "MARKER". + final int wordLengthCutOff = 3; + // Create tags to use for the main and side outputs. + final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; + final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; + final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; + + PCollectionTuple results = + words.apply(ParDo + .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) + .and(markedWordsTag)) + .of(new DoFn<String, String>() { + final TupleTag<String> specialWordsTag = new TupleTag<String>() { + }; + + public void processElement(ProcessContext c) { + String word = c.element(); + if (word.length() <= wordLengthCutOff) { + c.output(word); + } else { + c.sideOutput(wordLengthsAboveCutOffTag, word.length()); + } + if (word.startsWith("MAA")) { + c.sideOutput(markedWordsTag, word); + } + + if (word.startsWith("SPECIAL")) { + c.sideOutput(specialWordsTag, word); + } + } + })); + + // Extract the PCollection results, by tag. + PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); + PCollection<Integer> wordLengthsAboveCutOff = results.get + (wordLengthsAboveCutOffTag); + PCollection<String> markedWords = results.get(markedWordsTag); + + markedWords.apply(TextIO.Write.to(resultPath)); + + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java index 3569a78..5a46359 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java @@ -36,128 +36,128 @@ import java.util.List; public class ReadSourceITCase extends JavaProgramTestBase { - protected String resultPath; - - public ReadSourceITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "1", "2", "3", "4", "5", "6", "7", "8", "9"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(resultPath); - } - - private static void runProgram(String resultPath) { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> result = p - .apply(Read.from(new ReadSource(1, 10))) - .apply(ParDo.of(new DoFn<Integer, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().toString()); - } - })); - - result.apply(TextIO.Write.to(resultPath)); - p.run(); - } - - - private static class ReadSource extends BoundedSource<Integer> { - final int from; - final int to; - - ReadSource(int from, int to) { - this.from = from; - this.to = to; - } - - @Override - public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options) - throws Exception { - List<ReadSource> res = new ArrayList<>(); - FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); - int numWorkers = flinkOptions.getParallelism(); - Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0."); - - float step = 1.0f * (to - from) / numWorkers; - for (int i = 0; i < numWorkers; ++i) { - res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step))); - } - return res; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return 8 * (to - from); - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return true; - } - - @Override - public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { - return new RangeReader(this); - } - - @Override - public void validate() {} - - @Override - public Coder<Integer> getDefaultOutputCoder() { - return BigEndianIntegerCoder.of(); - } - - private class RangeReader extends BoundedReader<Integer> { - private int current; - - public RangeReader(ReadSource source) { - this.current = source.from - 1; - } - - @Override - public boolean start() throws IOException { - return true; - } - - @Override - public boolean advance() throws IOException { - current++; - return (current < to); - } - - @Override - public Integer getCurrent() { - return current; - } - - @Override - public void close() throws IOException { - // Nothing - } - - @Override - public BoundedSource<Integer> getCurrentSource() { - return ReadSource.this; - } - } - } + protected String resultPath; + + public ReadSourceITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> result = p + .apply(Read.from(new ReadSource(1, 10))) + .apply(ParDo.of(new DoFn<Integer, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toString()); + } + })); + + result.apply(TextIO.Write.to(resultPath)); + p.run(); + } + + + private static class ReadSource extends BoundedSource<Integer> { + final int from; + final int to; + + ReadSource(int from, int to) { + this.from = from; + this.to = to; + } + + @Override + public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options) + throws Exception { + List<ReadSource> res = new ArrayList<>(); + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + int numWorkers = flinkOptions.getParallelism(); + Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0."); + + float step = 1.0f * (to - from) / numWorkers; + for (int i = 0; i < numWorkers; ++i) { + res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step))); + } + return res; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 8 * (to - from); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return true; + } + + @Override + public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { + return new RangeReader(this); + } + + @Override + public void validate() {} + + @Override + public Coder<Integer> getDefaultOutputCoder() { + return BigEndianIntegerCoder.of(); + } + + private class RangeReader extends BoundedReader<Integer> { + private int current; + + public RangeReader(ReadSource source) { + this.current = source.from - 1; + } + + @Override + public boolean start() throws IOException { + return true; + } + + @Override + public boolean advance() throws IOException { + current++; + return (current < to); + } + + @Override + public Integer getCurrent() { + return current; + } + + @Override + public void close() throws IOException { + // Nothing + } + + @Override + public BoundedSource<Integer> getCurrentSource() { + return ReadSource.this; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java index db794f7..615f194 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java @@ -30,39 +30,39 @@ import java.util.List; public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase { - protected String resultPath; + protected String resultPath; - public RemoveDuplicatesEmptyITCase(){ - } + public RemoveDuplicatesEmptyITCase(){ + } - static final String[] EXPECTED_RESULT = new String[] {}; + static final String[] EXPECTED_RESULT = new String[] {}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - List<String> strings = Collections.emptyList(); + List<String> strings = Collections.emptyList(); - Pipeline p = FlinkTestPipeline.createForBatch(); + Pipeline p = FlinkTestPipeline.createForBatch(); - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); + PCollection<String> input = + p.apply(Create.of(strings)) + .setCoder(StringUtf8Coder.of()); - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java index 04e06b8..8c19f2c 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java @@ -30,40 +30,40 @@ import java.util.List; public class RemoveDuplicatesITCase extends JavaProgramTestBase { - protected String resultPath; + protected String resultPath; - public RemoveDuplicatesITCase(){ - } + public RemoveDuplicatesITCase(){ + } - static final String[] EXPECTED_RESULT = new String[] { - "k1", "k5", "k2", "k3"}; + static final String[] EXPECTED_RESULT = new String[] { + "k1", "k5", "k2", "k3"}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); + List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); - Pipeline p = FlinkTestPipeline.createForBatch(); + Pipeline p = FlinkTestPipeline.createForBatch(); - PCollection<String> input = - p.apply(Create.of(strings)) - .setCoder(StringUtf8Coder.of()); + PCollection<String> input = + p.apply(Create.of(strings)) + .setCoder(StringUtf8Coder.of()); - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java index ee8843c..7c3d6f9 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java @@ -28,40 +28,40 @@ import java.io.Serializable; public class SideInputITCase extends JavaProgramTestBase implements Serializable { - private static final String expected = "Hello!"; + private static final String expected = "Hello!"; - protected String resultPath; + protected String resultPath; - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); + Pipeline p = FlinkTestPipeline.createForBatch(); - final PCollectionView<String> sidesInput = p - .apply(Create.of(expected)) - .apply(View.<String>asSingleton()); + final PCollectionView<String> sidesInput = p + .apply(Create.of(expected)) + .apply(View.<String>asSingleton()); - p.apply(Create.of("bli")) - .apply(ParDo.of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - String s = c.sideInput(sidesInput); - c.output(s); - } - }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath)); + p.apply(Create.of("bli")) + .apply(ParDo.of(new DoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String s = c.sideInput(sidesInput); + c.output(s); + } + }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath)); - p.run(); - } + p.run(); + } - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java index 07c1294..715d0be 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java @@ -32,45 +32,45 @@ import java.net.URI; public class TfIdfITCase extends JavaProgramTestBase { - protected String resultPath; + protected String resultPath; - public TfIdfITCase(){ - } + public TfIdfITCase(){ + } - static final String[] EXPECTED_RESULT = new String[] { - "a", "m", "n", "b", "c", "d"}; + static final String[] EXPECTED_RESULT = new String[] { + "a", "m", "n", "b", "c", "d"}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - Pipeline pipeline = FlinkTestPipeline.createForBatch(); + Pipeline pipeline = FlinkTestPipeline.createForBatch(); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline - .apply(Create.of( - KV.of(new URI("x"), "a b c d"), - KV.of(new URI("y"), "a b c"), - KV.of(new URI("z"), "a m n"))) - .apply(new TFIDF.ComputeTfIdf()); + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline + .apply(Create.of( + KV.of(new URI("x"), "a b c d"), + KV.of(new URI("y"), "a b c"), + KV.of(new URI("z"), "a m n"))) + .apply(new TFIDF.ComputeTfIdf()); - PCollection<String> words = wordToUriAndTfIdf - .apply(Keys.<String>create()) - .apply(RemoveDuplicates.<String>create()); + PCollection<String> words = wordToUriAndTfIdf + .apply(Keys.<String>create()) + .apply(RemoveDuplicates.<String>create()); - words.apply(TextIO.Write.to(resultPath)); + words.apply(TextIO.Write.to(resultPath)); - pipeline.run(); - } + pipeline.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java index 9188097..f1a2454 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java @@ -32,43 +32,43 @@ import java.util.List; public class WordCountITCase extends JavaProgramTestBase { - protected String resultPath; + protected String resultPath; - public WordCountITCase(){ - } + public WordCountITCase(){ + } - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; + static final String[] WORDS_ARRAY = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; + static final String[] COUNTS_ARRAY = new String[] { + "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); + } - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); + Pipeline p = FlinkTestPipeline.createForBatch(); - PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - input - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(TextIO.Write.to(resultPath)); + input + .apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())) + .apply(TextIO.Write.to(resultPath)); - p.run(); - } + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java index ccc52c4..1cac036 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java @@ -33,104 +33,104 @@ import org.apache.flink.test.util.JavaProgramTestBase; public class WordCountJoin2ITCase extends JavaProgramTestBase { - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1", - "bob -> Tag1: 2 Tag2: 1", - "hi -> Tag1: 5 Tag2: 3", - "hooray -> Tag1: Tag2: 1", - "please -> Tag1: Tag2: 1", - "say -> Tag1: Tag2: 1", - "sue -> Tag1: 2 Tag2: 1", - "there -> Tag1: 1 Tag2: 1", - "tim -> Tag1: Tag2: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count; - } - c.output(key + " -> " + countTag1 + countTag2); - } - } + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1", + "bob -> Tag1: 2 Tag2: 1", + "hi -> Tag1: 5 Tag2: 3", + "hooray -> Tag1: Tag2: 1", + "please -> Tag1: Tag2: 1", + "say -> Tag1: Tag2: 1", + "sue -> Tag1: 2 Tag2: 1", + "there -> Tag1: 1 Tag2: 1", + "tim -> Tag1: Tag2: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count; + } + c.output(key + " -> " + countTag1 + countTag2); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java index e6eddc0..4c8b99b 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java @@ -33,122 +33,122 @@ import org.apache.flink.test.util.JavaProgramTestBase; public class WordCountJoin3ITCase extends JavaProgramTestBase { - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] WORDS_3 = new String[] { - "hi stephan", "beauty", "hooray big fabian", - "hi yo", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1 Tag3: 1", - "bob -> Tag1: 2 Tag2: 1 Tag3: ", - "hi -> Tag1: 5 Tag2: 3 Tag3: 3", - "hooray -> Tag1: Tag2: 1 Tag3: 1", - "please -> Tag1: Tag2: 1 Tag3: 1", - "say -> Tag1: Tag2: 1 Tag3: 1", - "sue -> Tag1: 2 Tag2: 1 Tag3: ", - "there -> Tag1: 1 Tag2: 1 Tag3: ", - "tim -> Tag1: Tag2: 1 Tag3: ", - "stephan -> Tag1: Tag2: Tag3: 1", - "yo -> Tag1: Tag2: Tag3: 1", - "fabian -> Tag1: Tag2: Tag3: 1", - "big -> Tag1: Tag2: Tag3: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .and(tag3, occurences3) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - String countTag3 = tag3.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count + " "; - } - for (Long count : value.getAll(tag3)) { - countTag3 += count; - } - c.output(key + " -> " + countTag1 + countTag2 + countTag3); - } - } + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] WORDS_3 = new String[] { + "hi stephan", "beauty", "hooray big fabian", + "hi yo", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1 Tag3: 1", + "bob -> Tag1: 2 Tag2: 1 Tag3: ", + "hi -> Tag1: 5 Tag2: 3 Tag3: 3", + "hooray -> Tag1: Tag2: 1 Tag3: 1", + "please -> Tag1: Tag2: 1 Tag3: 1", + "say -> Tag1: Tag2: 1 Tag3: 1", + "sue -> Tag1: 2 Tag2: 1 Tag3: ", + "there -> Tag1: 1 Tag2: 1 Tag3: ", + "tim -> Tag1: Tag2: 1 Tag3: ", + "stephan -> Tag1: Tag2: Tag3: 1", + "yo -> Tag1: Tag2: Tag3: 1", + "fabian -> Tag1: Tag2: Tag3: 1", + "big -> Tag1: Tag2: Tag3: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .and(tag3, occurences3) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + String countTag3 = tag3.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count + " "; + } + for (Long count : value.getAll(tag3)) { + countTag3 += count; + } + c.output(key + " -> " + countTag1 + countTag2 + countTag3); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java index 865fc5f..a61bf52 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java @@ -39,118 +39,118 @@ import static org.junit.Assert.*; */ public class WriteSinkITCase extends JavaProgramTestBase { - protected String resultPath; + protected String resultPath; - public WriteSinkITCase(){ - } + public WriteSinkITCase(){ + } - static final String[] EXPECTED_RESULT = new String[] { - "Joe red 3", "Mary blue 4", "Max yellow 23"}; + static final String[] EXPECTED_RESULT = new String[] { + "Joe red 3", "Mary blue 4", "Max yellow 23"}; - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(resultPath); - } - - private static void runProgram(String resultPath) { - Pipeline p = FlinkTestPipeline.createForBatch(); - - p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) - .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); - - p.run(); - } - - /** - * Simple custom sink which writes to a file. - */ - private static class MyCustomSink extends Sink<String> { - - private final String resultPath; - - public MyCustomSink(String resultPath) { - this.resultPath = resultPath; - } - - @Override - public void validate(PipelineOptions options) { - assertNotNull(options); - } - - @Override - public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { - return new MyWriteOperation(); - } - - private class MyWriteOperation extends WriteOperation<String, String> { - - @Override - public Coder<String> getWriterResultCoder() { - return StringUtf8Coder.of(); - } - - @Override - public void initialize(PipelineOptions options) throws Exception { - - } - - @Override - public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { - - } - - @Override - public Writer<String, String> createWriter(PipelineOptions options) throws Exception { - return new MyWriter(); - } - - @Override - public Sink<String> getSink() { - return MyCustomSink.this; - } - - /** - * Simple Writer which writes to a file. - */ - private class MyWriter extends Writer<String, String> { - - private PrintWriter internalWriter; - - @Override - public void open(String uId) throws Exception { - Path path = new Path(resultPath + "/" + uId); - FileSystem.get(new URI("file:///")).create(path, false); - internalWriter = new PrintWriter(new File(path.toUri())); - } - - @Override - public void write(String value) throws Exception { - internalWriter.println(value); - } - - @Override - public String close() throws Exception { - internalWriter.close(); - return resultPath; - } - - @Override - public WriteOperation<String, String> getWriteOperation() { - return MyWriteOperation.this; - } - } - } - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) + .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); + + p.run(); + } + + /** + * Simple custom sink which writes to a file. + */ + private static class MyCustomSink extends Sink<String> { + + private final String resultPath; + + public MyCustomSink(String resultPath) { + this.resultPath = resultPath; + } + + @Override + public void validate(PipelineOptions options) { + assertNotNull(options); + } + + @Override + public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { + return new MyWriteOperation(); + } + + private class MyWriteOperation extends WriteOperation<String, String> { + + @Override + public Coder<String> getWriterResultCoder() { + return StringUtf8Coder.of(); + } + + @Override + public void initialize(PipelineOptions options) throws Exception { + + } + + @Override + public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { + + } + + @Override + public Writer<String, String> createWriter(PipelineOptions options) throws Exception { + return new MyWriter(); + } + + @Override + public Sink<String> getSink() { + return MyCustomSink.this; + } + + /** + * Simple Writer which writes to a file. + */ + private class MyWriter extends Writer<String, String> { + + private PrintWriter internalWriter; + + @Override + public void open(String uId) throws Exception { + Path path = new Path(resultPath + "/" + uId); + FileSystem.get(new URI("file:///")).create(path, false); + internalWriter = new PrintWriter(new File(path.toUri())); + } + + @Override + public void write(String value) throws Exception { + internalWriter.println(value); + } + + @Override + public String close() throws Exception { + internalWriter.close(); + return resultPath; + } + + @Override + public WriteOperation<String, String> getWriteOperation() { + return MyWriteOperation.this; + } + } + } + } }