(1) A subtask is a parallel instance of an operator and thus responsible for a partition (possibly infinite) of the whole DataStream/DataSet.
(2) Maybe you can add this feature to Flink's Cassandra Sink. Cheers, Till On Mon, Dec 12, 2016 at 4:30 PM, Meghashyam Sandeep V < [email protected]> wrote: > Data piles up in Cassandra without TTL. Is there a workaround for this > problem? Is there a way to specify my query and still use Pojo? > > Thanks, > > On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <[email protected]> > wrote: > >> Regarding 2) I don't think so. That would require access to the datastax >> MappingManager. >> We could add something similar as the ClusterBuilder for that though. >> >> Regards, >> Chesnay >> >> >> On 12.12.2016 16:15, Meghashyam Sandeep V wrote: >> >> Hi Till, >> >> Thanks for the information. >> >> 1. What do you mean by 'subtask', is it every partition or every message >> in the stream? >> >> 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL >> as I can't use a query when I have a datastream with Pojo? >> >> CassandraSink.addSink(messageStream) >> .setClusterBuilder(new ClusterBuilder() { >> @Override protected Cluster >> buildCluster(Cluster.Builder builder) { >> return buildCassandraCluster(); >> } >> }) >> .build(); >> >> Thanks, >> >> >> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <[email protected]> >> wrote: >> >>> Hi Meghashyam, >>> >>> 1. >>> >>> You can perform initializations in the open method of the >>> RichSinkFunction interface. The open method will be called once for >>> every sub task when initializing it. If you want to share the resource >>> across multiple sub tasks running in the same JVM you can also store the >>> dbSession in a class variable. >>> 2. >>> >>> The Flink community is currently working on adding security support >>> including ssl encryption to Flink. So maybe in the future you can use >>> Flink’s Cassandra sink again. >>> >>> Cheers, >>> Till >>> >>> >>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V < >>> <[email protected]>[email protected]> wrote: >>> >>>> Thanks a lot for the quick reply Shannon. >>>> >>>> 1. I will create a class that extends SinkFunction and write my >>>> connection logic there. My only question here is- will a dbSession be >>>> created for each message/partition which might affect the performance? >>>> Thats the reason why I added this line to create a connection once and use >>>> it along the datastream. if(dbSession == null && store!=null) { dbSession >>>> = getSession();} >>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for >>>> my C* cluster and I couldn't get it work with all my SSL >>>> config(truststore,keystore etc) added to cluster building. I didn't find a >>>> proper example with SSL enabled flink-connector-cassandra >>>> >>>> >>>> Thanks >>>> >>>> >>>> >>>> >>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <[email protected]> >>>> wrote: >>>> >>>>> You haven't really added a sink in Flink terminology, you're just >>>>> performing a side effect within a map operator. So while it may work, if >>>>> you want to add a sink proper you need have an object that extends >>>>> SinkFunction or RichSinkFunction. The method call on the stream should be >>>>> ".addSink(…)". >>>>> >>>>> Also, the dbSession isn't really Flink state as it will not vary based >>>>> on the position in or content in the stream. It's a necessary helper >>>>> object, yes, but you don't need Flink to checkpoint it. >>>>> >>>>> You can still use the sinks provided with flink-connector-cassandra >>>>> and customize the cluster building by passing your own ClusterBuilder into >>>>> the constructor. >>>>> >>>>> -Shannon >>>>> >>>>> From: Meghashyam Sandeep V < <[email protected]> >>>>> [email protected]> >>>>> Date: Friday, December 9, 2016 at 12:26 PM >>>>> To: < <[email protected]>[email protected]>, < >>>>> [email protected]> >>>>> Subject: Reg. custom sinks in Flink >>>>> >>>>> Hi there, >>>>> >>>>> I have a flink streaming app where my source is Kafka and a custom >>>>> sink to Cassandra(I can't use standard C* sink that comes with flink as I >>>>> have customized auth to C*). I'm currently have the following: >>>>> >>>>> messageStream >>>>> .rebalance() >>>>> >>>>> .map( s-> { >>>>> >>>>> return mapper.readValue(s, JsonNode.class);) >>>>> >>>>> .filter(//filter some messages) >>>>> >>>>> .map( >>>>> >>>>> (MapFunction<JsonNode, String>) message -> { >>>>> >>>>> getDbSession.execute("QUERY_TO_EXEC") >>>>> >>>>> }) >>>>> >>>>> private static Session getDbSession() { >>>>> if(dbSession == null && store!=null) { >>>>> dbSession = getSession(); >>>>> } >>>>> >>>>> return dbSession; >>>>> } >>>>> >>>>> 1. Is this the right way to add a custom sink? As you can see, I have >>>>> dbSession as a class variable here and I'm storing its state. >>>>> >>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). >>>>> When I run using flink with YARN on EMR I get a NPE at the session which >>>>> is kind of weird. >>>>> >>>>> Thanks >>>>> >>>>> >
