Thanks a lot for the quick reply Shannon. 1. I will create a class that extends SinkFunction and write my connection logic there. My only question here is- will a dbSession be created for each message/partition which might affect the performance? Thats the reason why I added this line to create a connection once and use it along the datastream. if(dbSession == null && store!=null) { dbSession = getSession();}
2. I couldn't use flink-connector-cassandra as I have SSL enabled for my C* cluster and I couldn't get it work with all my SSL config(truststore,keystore etc) added to cluster building. I didn't find a proper example with SSL enabled flink-connector-cassandra Thanks On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <sca...@expedia.com> wrote: > You haven't really added a sink in Flink terminology, you're just > performing a side effect within a map operator. So while it may work, if > you want to add a sink proper you need have an object that extends > SinkFunction or RichSinkFunction. The method call on the stream should be > ".addSink(…)". > > Also, the dbSession isn't really Flink state as it will not vary based on > the position in or content in the stream. It's a necessary helper object, > yes, but you don't need Flink to checkpoint it. > > You can still use the sinks provided with flink-connector-cassandra and > customize the cluster building by passing your own ClusterBuilder into the > constructor. > > -Shannon > > From: Meghashyam Sandeep V <vr1meghash...@gmail.com> > Date: Friday, December 9, 2016 at 12:26 PM > To: <user@flink.apache.org>, <d...@flink.apache.org> > Subject: Reg. custom sinks in Flink > > Hi there, > > I have a flink streaming app where my source is Kafka and a custom sink to > Cassandra(I can't use standard C* sink that comes with flink as I have > customized auth to C*). I'm currently have the following: > > messageStream > .rebalance() > > .map( s-> { > > return mapper.readValue(s, JsonNode.class);) > > .filter(//filter some messages) > > .map( > > (MapFunction<JsonNode, String>) message -> { > > getDbSession.execute("QUERY_TO_EXEC") > > }) > > private static Session getDbSession() { > if(dbSession == null && store!=null) { > dbSession = getSession(); > } > > return dbSession; > } > > 1. Is this the right way to add a custom sink? As you can see, I have > dbSession as a class variable here and I'm storing its state. > > 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When I > run using flink with YARN on EMR I get a NPE at the session which is kind of > weird. > > > Thanks > >