Data piles up in Cassandra without TTL. Is there a workaround for this problem? Is there a way to specify my query and still use Pojo?
Thanks, On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <[email protected]> wrote: > Regarding 2) I don't think so. That would require access to the datastax > MappingManager. > We could add something similar as the ClusterBuilder for that though. > > Regards, > Chesnay > > > On 12.12.2016 16:15, Meghashyam Sandeep V wrote: > > Hi Till, > > Thanks for the information. > > 1. What do you mean by 'subtask', is it every partition or every message > in the stream? > > 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL > as I can't use a query when I have a datastream with Pojo? > > CassandraSink.addSink(messageStream) > .setClusterBuilder(new ClusterBuilder() { > @Override protected Cluster > buildCluster(Cluster.Builder builder) { > return buildCassandraCluster(); > } > }) > .build(); > > Thanks, > > > On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <[email protected]> > wrote: > >> Hi Meghashyam, >> >> 1. >> >> You can perform initializations in the open method of the >> RichSinkFunction interface. The open method will be called once for >> every sub task when initializing it. If you want to share the resource >> across multiple sub tasks running in the same JVM you can also store the >> dbSession in a class variable. >> 2. >> >> The Flink community is currently working on adding security support >> including ssl encryption to Flink. So maybe in the future you can use >> Flink’s Cassandra sink again. >> >> Cheers, >> Till >> >> >> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V < >> <[email protected]>[email protected]> wrote: >> >>> Thanks a lot for the quick reply Shannon. >>> >>> 1. I will create a class that extends SinkFunction and write my >>> connection logic there. My only question here is- will a dbSession be >>> created for each message/partition which might affect the performance? >>> Thats the reason why I added this line to create a connection once and use >>> it along the datastream. if(dbSession == null && store!=null) { dbSession >>> = getSession();} >>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for >>> my C* cluster and I couldn't get it work with all my SSL >>> config(truststore,keystore etc) added to cluster building. I didn't find a >>> proper example with SSL enabled flink-connector-cassandra >>> >>> >>> Thanks >>> >>> >>> >>> >>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <[email protected]> >>> wrote: >>> >>>> You haven't really added a sink in Flink terminology, you're just >>>> performing a side effect within a map operator. So while it may work, if >>>> you want to add a sink proper you need have an object that extends >>>> SinkFunction or RichSinkFunction. The method call on the stream should be >>>> ".addSink(…)". >>>> >>>> Also, the dbSession isn't really Flink state as it will not vary based >>>> on the position in or content in the stream. It's a necessary helper >>>> object, yes, but you don't need Flink to checkpoint it. >>>> >>>> You can still use the sinks provided with flink-connector-cassandra and >>>> customize the cluster building by passing your own ClusterBuilder into the >>>> constructor. >>>> >>>> -Shannon >>>> >>>> From: Meghashyam Sandeep V < <[email protected]> >>>> [email protected]> >>>> Date: Friday, December 9, 2016 at 12:26 PM >>>> To: < <[email protected]>[email protected]>, < >>>> [email protected]> >>>> Subject: Reg. custom sinks in Flink >>>> >>>> Hi there, >>>> >>>> I have a flink streaming app where my source is Kafka and a custom sink >>>> to Cassandra(I can't use standard C* sink that comes with flink as I have >>>> customized auth to C*). I'm currently have the following: >>>> >>>> messageStream >>>> .rebalance() >>>> >>>> .map( s-> { >>>> >>>> return mapper.readValue(s, JsonNode.class);) >>>> >>>> .filter(//filter some messages) >>>> >>>> .map( >>>> >>>> (MapFunction<JsonNode, String>) message -> { >>>> >>>> getDbSession.execute("QUERY_TO_EXEC") >>>> >>>> }) >>>> >>>> private static Session getDbSession() { >>>> if(dbSession == null && store!=null) { >>>> dbSession = getSession(); >>>> } >>>> >>>> return dbSession; >>>> } >>>> >>>> 1. Is this the right way to add a custom sink? As you can see, I have >>>> dbSession as a class variable here and I'm storing its state. >>>> >>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When >>>> I run using flink with YARN on EMR I get a NPE at the session which is >>>> kind of weird. >>>> >>>> Thanks >>>> >>>>
