Hi Sumit,

I created a PR containing Cassandra IO with a sink:

https://github.com/apache/incubator-beam/pull/592

Maybe it can help you.

Regards
JB

On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
Hi Kenneth

Thanks for looking into it. I am currently trying to implement Sinks for
writing data into Cassandra/Titan DB.  My immediate goal is to run it on
Flink Runner.



Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

Hi Sumit,

I see what has happened here, from that snippet you pasted from the Flink
runner's code [1]. Thanks for looking into it!

The Flink runner today appears to reject Write.Bounded transforms in
streaming mode if the sink is not an instance of UnboundedFlinkSink. The
intent of that code, I believe, was to special case UnboundedFlinkSink to
make it easy to use an existing Flink sink, not to disable all other Write
transforms. What do you think, Max?

Until we fix this issue, you should use ParDo transforms to do the writing.
If you can share a little about your sink, we may be able to suggest
patterns for implementing it. Like Eugene said, the Write.of(Sink)
transform is just a specialized pattern of ParDo's, not a Beam primitive.

Kenn

[1]

https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203


On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

Thanks Sumit. Looks like your question is, indeed, specific to the Flink
runner, and I'll then defer to somebody familiar with it.

On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <sumitkcha...@gmail.com>
wrote:

Thanks a lot Eugene.

My immediate requirement is to run this Sink on FlinkRunner. Which
mandates that my implementation must also implement SinkFunction<>.  In
that >>>case, none of the Sink<> methods get called anyway.

I am using FlinkRunner. The Sink implementation that i was writing by
extending Sink<> class had to implement Flink Specific SinkFunction for
the
correct translation.

private static class WriteSinkStreamingTranslator<T> implements


FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
{

   @Override
   public void translateNode(Write.Bound<T> transform,
FlinkStreamingTranslationContext context) {
     String name = transform.getName();
     PValue input = context.getInput(transform);

     Sink<T> sink = transform.getSink();
     if (!(sink instanceof UnboundedFlinkSink)) {
       throw new UnsupportedOperationException("At the time, only
unbounded Flink sinks are supported.");
     }

     DataStream<WindowedValue<T>> inputDataSet =
context.getInputDataStream(input);

     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
Object>()
{
       @Override
       public void flatMap(WindowedValue<T> value, Collector<Object>
out) throws Exception {
         out.collect(value.getValue());
       }
     }).addSink(((UnboundedFlinkSink<Object>)
sink).getFlinkSource()).name(name);
   }
}




Regards
Sumit Chawla


On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

Hi Sumit,

All reusable parts of a pipeline, including connectors to storage
systems,
should be packaged as PTransform's.

Sink is an advanced API that you can use under the hood to implement
the
transform, if this particular connector benefits from this API - but
you
don't have to, and many connectors indeed don't need it, and are
simpler
to
implement just as wrappers around a couple of ParDo's writing the
data.

Even if the connector is implemented using a Sink, packaging the
connector
as a PTransform is important because it's easier to apply in a
pipeline
and
because it's more future-proof (the author of the connector may later
change it to use something else rather than Sink under the hood
without
breaking existing users).

Sink is, currently, useful in the following case:
- You're writing a bounded amount of data (we do not yet have an
unbounded
Sink analogue)
- The location you're writing to is known at pipeline construction
time,
and does not depend on the data itself (support for "data-dependent"
sinks
is on the radar https://issues.apache.org/jira/browse/BEAM-92)
- The storage system you're writing to has a distinct
"initialization"
and
"finalization" step, allowing the write operation to appear atomic
(either
all data is written or none). This mostly applies to files (where
writing
is done by first writing to a temporary directory, and then renaming
all
files to their final location), but there can be other cases too.

Here's an example GCP connector using the Sink API under the hood:




https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
Most other non-file-based connectors, indeed, don't (KafkaIO,
DatastoreIO,
BigtableIO etc.)

I'm not familiar with the Flink API, however I'm a bit confused by
your
last paragraph: the Beam programming model is intentionally
runner-agnostic, so that you can run exactly the same code on
different
runners.

On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <sumitkcha...@gmail.com

wrote:

Hi

Please suggest me on what is the best way to write a Sink in
Beam.  I
see
that there is a Sink<T> abstract class which is in experimental
state.
What is the expected outcome of this one? Do we have the api
frozen,
or
this could still change?  Most of the existing Sink implementations
like
KafkaIO.Write are not using this interface, and instead extends
PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed to
extend
Sink<>.


My immediate requirement is to run this Sink on FlinkRunner. Which
mandates
that my implementation must also implement SinkFunction<>.  In that
case,
none of the Sink<> methods get called anyway.

Regards
Sumit Chawla







--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to