Thanks Jean This is an interesting pattern here. I see that its implemented as PTransform, with constructs ( WriteOperation/Writer) pretty similar to Sink<T> interface. Would love to hear more pros/cons of this pattern :) . Definitely it gives more control over connection initialization and cleanup.
Regards Sumit Chawla On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Sumit, > > I created a PR containing Cassandra IO with a sink: > > https://github.com/apache/incubator-beam/pull/592 > > Maybe it can help you. > > Regards > JB > > > On 07/28/2016 09:00 PM, Chawla,Sumit wrote: > >> Hi Kenneth >> >> Thanks for looking into it. I am currently trying to implement Sinks for >> writing data into Cassandra/Titan DB. My immediate goal is to run it on >> Flink Runner. >> >> >> >> Regards >> Sumit Chawla >> >> >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <k...@google.com.invalid >> > >> wrote: >> >> Hi Sumit, >>> >>> I see what has happened here, from that snippet you pasted from the Flink >>> runner's code [1]. Thanks for looking into it! >>> >>> The Flink runner today appears to reject Write.Bounded transforms in >>> streaming mode if the sink is not an instance of UnboundedFlinkSink. The >>> intent of that code, I believe, was to special case UnboundedFlinkSink to >>> make it easy to use an existing Flink sink, not to disable all other >>> Write >>> transforms. What do you think, Max? >>> >>> Until we fix this issue, you should use ParDo transforms to do the >>> writing. >>> If you can share a little about your sink, we may be able to suggest >>> patterns for implementing it. Like Eugene said, the Write.of(Sink) >>> transform is just a specialized pattern of ParDo's, not a Beam primitive. >>> >>> Kenn >>> >>> [1] >>> >>> >>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203 >>> >>> >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov < >>> kirpic...@google.com.invalid> wrote: >>> >>> Thanks Sumit. Looks like your question is, indeed, specific to the Flink >>>> runner, and I'll then defer to somebody familiar with it. >>>> >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <sumitkcha...@gmail.com> >>>> wrote: >>>> >>>> Thanks a lot Eugene. >>>>> >>>>> My immediate requirement is to run this Sink on FlinkRunner. Which >>>>>>>> >>>>>>> mandates that my implementation must also implement SinkFunction<>. >>>>> In >>>>> that >>>case, none of the Sink<> methods get called anyway. >>>>> >>>>> I am using FlinkRunner. The Sink implementation that i was writing by >>>>> extending Sink<> class had to implement Flink Specific SinkFunction for >>>>> >>>> the >>>> >>>>> correct translation. >>>>> >>>>> private static class WriteSinkStreamingTranslator<T> implements >>>>> >>>>> >>>> >>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> >>> >>>> { >>>>> >>>>> @Override >>>>> public void translateNode(Write.Bound<T> transform, >>>>> FlinkStreamingTranslationContext context) { >>>>> String name = transform.getName(); >>>>> PValue input = context.getInput(transform); >>>>> >>>>> Sink<T> sink = transform.getSink(); >>>>> if (!(sink instanceof UnboundedFlinkSink)) { >>>>> throw new UnsupportedOperationException("At the time, only >>>>> unbounded Flink sinks are supported."); >>>>> } >>>>> >>>>> DataStream<WindowedValue<T>> inputDataSet = >>>>> context.getInputDataStream(input); >>>>> >>>>> inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, >>>>> >>>> Object>() >>> >>>> { >>>> >>>>> @Override >>>>> public void flatMap(WindowedValue<T> value, Collector<Object> >>>>> out) throws Exception { >>>>> out.collect(value.getValue()); >>>>> } >>>>> }).addSink(((UnboundedFlinkSink<Object>) >>>>> sink).getFlinkSource()).name(name); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> Regards >>>>> Sumit Chawla >>>>> >>>>> >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov < >>>>> kirpic...@google.com.invalid> wrote: >>>>> >>>>> Hi Sumit, >>>>>> >>>>>> All reusable parts of a pipeline, including connectors to storage >>>>>> >>>>> systems, >>>>> >>>>>> should be packaged as PTransform's. >>>>>> >>>>>> Sink is an advanced API that you can use under the hood to implement >>>>>> >>>>> the >>>> >>>>> transform, if this particular connector benefits from this API - but >>>>>> >>>>> you >>>> >>>>> don't have to, and many connectors indeed don't need it, and are >>>>>> >>>>> simpler >>>> >>>>> to >>>>> >>>>>> implement just as wrappers around a couple of ParDo's writing the >>>>>> >>>>> data. >>> >>>> >>>>>> Even if the connector is implemented using a Sink, packaging the >>>>>> >>>>> connector >>>>> >>>>>> as a PTransform is important because it's easier to apply in a >>>>>> >>>>> pipeline >>> >>>> and >>>>> >>>>>> because it's more future-proof (the author of the connector may later >>>>>> change it to use something else rather than Sink under the hood >>>>>> >>>>> without >>> >>>> breaking existing users). >>>>>> >>>>>> Sink is, currently, useful in the following case: >>>>>> - You're writing a bounded amount of data (we do not yet have an >>>>>> >>>>> unbounded >>>>> >>>>>> Sink analogue) >>>>>> - The location you're writing to is known at pipeline construction >>>>>> >>>>> time, >>>> >>>>> and does not depend on the data itself (support for "data-dependent" >>>>>> >>>>> sinks >>>>> >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92) >>>>>> - The storage system you're writing to has a distinct >>>>>> >>>>> "initialization" >>> >>>> and >>>>> >>>>>> "finalization" step, allowing the write operation to appear atomic >>>>>> >>>>> (either >>>>> >>>>>> all data is written or none). This mostly applies to files (where >>>>>> >>>>> writing >>>> >>>>> is done by first writing to a temporary directory, and then renaming >>>>>> >>>>> all >>>> >>>>> files to their final location), but there can be other cases too. >>>>>> >>>>>> Here's an example GCP connector using the Sink API under the hood: >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797 >>> >>>> Most other non-file-based connectors, indeed, don't (KafkaIO, >>>>>> >>>>> DatastoreIO, >>>>> >>>>>> BigtableIO etc.) >>>>>> >>>>>> I'm not familiar with the Flink API, however I'm a bit confused by >>>>>> >>>>> your >>> >>>> last paragraph: the Beam programming model is intentionally >>>>>> runner-agnostic, so that you can run exactly the same code on >>>>>> >>>>> different >>> >>>> runners. >>>>>> >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkcha...@gmail.com >>>>>> >>>>> >>>> wrote: >>>>>> >>>>>> Hi >>>>>>> >>>>>>> Please suggest me on what is the best way to write a Sink in >>>>>>> >>>>>> Beam. I >>> >>>> see >>>>> >>>>>> that there is a Sink<T> abstract class which is in experimental >>>>>>> >>>>>> state. >>>> >>>>> What is the expected outcome of this one? Do we have the api >>>>>>> >>>>>> frozen, >>> >>>> or >>>> >>>>> this could still change? Most of the existing Sink implementations >>>>>>> >>>>>> like >>>>> >>>>>> KafkaIO.Write are not using this interface, and instead extends >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to >>>>>>> >>>>>> extend >>>>>> >>>>>>> Sink<>. >>>>>>> >>>>>>> >>>>>>> My immediate requirement is to run this Sink on FlinkRunner. Which >>>>>>> >>>>>> mandates >>>>>> >>>>>>> that my implementation must also implement SinkFunction<>. In that >>>>>>> >>>>>> case, >>>>> >>>>>> none of the Sink<> methods get called anyway. >>>>>>> >>>>>>> Regards >>>>>>> Sumit Chawla >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >