Data piles up in Cassandra without TTL. Is there a workaround for this
problem? Is there a way to specify my query and still use Pojo?

Thanks,

On Mon, Dec 12, 2016 at 7:26 AM, Chesnay Schepler <[email protected]>
wrote:

> Regarding 2) I don't think so. That would require access to the datastax
> MappingManager.
> We could add something similar as the ClusterBuilder for that though.
>
> Regards,
> Chesnay
>
>
> On 12.12.2016 16:15, Meghashyam Sandeep V wrote:
>
> Hi Till,
>
> Thanks for the information.
>
> 1. What do you mean by 'subtask', is it every partition or every message
> in the stream?
>
> 2. I tried using CassandraSink with a Pojo. Is there a way to specify TTL
> as I can't use a query when I have a datastream with Pojo?
>
> CassandraSink.addSink(messageStream)
>          .setClusterBuilder(new ClusterBuilder() {
>              @Override             protected Cluster 
> buildCluster(Cluster.Builder builder) {
>                  return buildCassandraCluster();
>              }
>          })
>          .build();
>
> Thanks,
>
>
> On Mon, Dec 12, 2016 at 3:17 AM, Till Rohrmann <[email protected]>
> wrote:
>
>> Hi Meghashyam,
>>
>>    1.
>>
>>    You can perform initializations in the open method of the
>>    RichSinkFunction interface. The open method will be called once for
>>    every sub task when initializing it. If you want to share the resource
>>    across multiple sub tasks running in the same JVM you can also store the
>>    dbSession in a class variable.
>>    2.
>>
>>    The Flink community is currently working on adding security support
>>    including ssl encryption to Flink. So maybe in the future you can use
>>    Flink’s Cassandra sink again.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Fri, Dec 9, 2016 at 8:05 PM, Meghashyam Sandeep V <
>> <[email protected]>[email protected]> wrote:
>>
>>> Thanks a lot for the quick reply Shannon.
>>>
>>> 1. I will create a class that extends SinkFunction and write my
>>> connection logic there. My only question here is- will a dbSession be
>>> created for each message/partition which might affect the performance?
>>> Thats the reason why I added this line to create a connection once and use
>>> it along the datastream. if(dbSession == null && store!=null) { dbSession
>>> = getSession();}
>>> 2. I couldn't use flink-connector-cassandra as I have SSL enabled for
>>> my C* cluster and I couldn't get it work with all my SSL
>>> config(truststore,keystore etc) added to cluster building. I didn't find a
>>> proper example with SSL enabled flink-connector-cassandra
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> On Fri, Dec 9, 2016 at 10:54 AM, Shannon Carey <[email protected]>
>>> wrote:
>>>
>>>> You haven't really added a sink in Flink terminology, you're just
>>>> performing a side effect within a map operator. So while it may work, if
>>>> you want to add a sink proper you need have an object that extends
>>>> SinkFunction or RichSinkFunction. The method call on the stream should be
>>>> ".addSink(…)".
>>>>
>>>> Also, the dbSession isn't really Flink state as it will not vary based
>>>> on the position in or content in the stream. It's a necessary helper
>>>> object, yes, but you don't need Flink to checkpoint it.
>>>>
>>>> You can still use the sinks provided with flink-connector-cassandra and
>>>> customize the cluster building by passing your own ClusterBuilder into the
>>>> constructor.
>>>>
>>>> -Shannon
>>>>
>>>> From: Meghashyam Sandeep V < <[email protected]>
>>>> [email protected]>
>>>> Date: Friday, December 9, 2016 at 12:26 PM
>>>> To: < <[email protected]>[email protected]>, <
>>>> [email protected]>
>>>> Subject: Reg. custom sinks in Flink
>>>>
>>>> Hi there,
>>>>
>>>> I have a flink streaming app where my source is Kafka and a custom sink
>>>> to Cassandra(I can't use standard C* sink that comes with flink as I have
>>>> customized auth to C*). I'm currently have the following:
>>>>
>>>> messageStream
>>>>         .rebalance()
>>>>
>>>>         .map( s-> {
>>>>
>>>>     return mapper.readValue(s, JsonNode.class);)
>>>>
>>>>         .filter(//filter some messages)
>>>>
>>>>         .map(
>>>>
>>>>          (MapFunction<JsonNode, String>) message -> {
>>>>
>>>>          getDbSession.execute("QUERY_TO_EXEC")
>>>>
>>>>          })
>>>>
>>>> private static Session getDbSession() {
>>>>     if(dbSession == null && store!=null) {
>>>>         dbSession = getSession();
>>>>     }
>>>>
>>>>     return dbSession;
>>>> }
>>>>
>>>> 1. Is this the right way to add a custom sink? As you can see, I have 
>>>> dbSession as a class variable here and I'm storing its state.
>>>>
>>>> 2. This setup works fine in a standalone flink (java -jar MyJar.jar). When 
>>>> I run using flink with YARN on EMR I get a NPE at the session which is 
>>>> kind of weird.
>>>>
>>>> Thanks
>>>>
>>>>

Reply via email to