Thanks a lot for the quick reply Shannon.

1. I will create a class that extends SinkFunction and write my connection
logic there. My only question here is- will a dbSession be created for each
message/partition which might affect the performance? Thats the reason why
I added this line to create a connection once and use it along the
datastream. if(dbSession == null && store!=null) { dbSession =
getSession();}

2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C*
cluster and I couldn't get it work with all my SSL
config(truststore,keystore etc) added to cluster building. I didn't find a
proper example with SSL enabled flink-connector-cassandra


Thanks




On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sca...@expedia.com> wrote:

> You haven't really added a sink in Flink terminology, you're just
> performing a side effect within a map operator. So while it may work, if
> you want to add a sink proper you need have an object that extends
> SinkFunction or RichSinkFunction. The method call on the stream should be
> ".addSink(…)".
>
> Also, the dbSession isn't really Flink state as it will not vary based on
> the position in or content in the stream. It's a necessary helper object,
> yes, but you don't need Flink to checkpoint it.
>
> You can still use the sinks provided with flink-connector-cassandra and
> customize the cluster building by passing your own ClusterBuilder into the
> constructor.
>
> -Shannon
>
> From: Meghashyam Sandeep V <vr1meghash...@gmail.com>
> Date: Friday, December 9, 2016 at 12:26 PM
> To: <user@flink.apache.org>, <d...@flink.apache.org>
> Subject: Reg. custom sinks in Flink
>
> Hi there,
>
> I have a flink streaming app where my source is Kafka and a custom sink to
> Cassandra(I can't use standard C* sink that comes with flink as I have
> customized auth to C*). I'm currently have the following:
>
> messageStream
>         .rebalance()
>
>         .map( s-> {
>
>     return mapper.readValue(s, JsonNode.class);)
>
>         .filter(//filter some messages)
>
>         .map(
>
>          (MapFunction<JsonNode, String>) message -> {
>
>          getDbSession.execute("QUERY_TO_EXEC")
>
>          })
>
> private static Session getDbSession() {
>     if(dbSession == null && store!=null) {
>         dbSession = getSession();
>     }
>
>     return dbSession;
> }
>
> 1. Is this the right way to add a custom sink? As you can see, I have 
> dbSession as a class variable here and I'm storing its state.
>
> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I 
> run using flink with YARN on EMR I get a NPE at the session which is kind of 
> weird.
>
>
> Thanks
>
>

Reply via email to