Yes but when CassandraScalaProductSinkBuilder called after identifying case
class in CassandraSink class it will do sanityCheck and will throw the
exception cause It won’t pass any query in that case.

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java

On Thu, 21 Dec 2017 at 7:36 PM, Timo Walther <twal...@apache.org> wrote:

> Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.
>
> For case classes there is also a dedicated cassandra sink (every case
> class is a Product):
>
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java
>
> Regards,
> Timo
>
>
>
> Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
>
> Hi,
>
> I have added netty-all 4.0 as dependency now it's working fine. Only thing
> I had to create POJO class ion scala like this.
>
> @SerialVersionUID(507L)
> @Table(keyspace = "twtt", name = "order")
> class OrderFinal(
>                        @BeanProperty var name: String,
>                        @BeanProperty var userEmail: String)extends
> Serializable
> {
>   def this() {
>     this("NA", "NA")
>   }
> }
>
>
> If I am removing @BeanProperty or converting var to Val. It's giving error
> of no setters or getters found or multiple found. This is the final
> workaround i found.
>
>
>
> ‌
>
> On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <shashank...@gmail.com>
> wrote:
>
>> I have tried that by creating class with companion static object:
>>
>> @SerialVersionUID(507L)
>> @Table(keyspace = "neofp", name = "order_detail")
>> class OrderFinal(
>>                        @BeanProperty var order_name: String,
>>                        @BeanProperty var user: String )extends
>> Serializable
>> {
>>   def this() {
>>     this("NA", "NA",)
>>   }
>> }
>> object OrderFinal
>> {
>>
>> }
>>
>>
>>
>> When running with 1.4.0 it's giving following error :
>>
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.datastax.driver.core.NettyUtil
>> at
>> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 12/19/2017 18:04:33 Job execution switched to status FAILING.
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.datastax.driver.core.NettyUtil
>> at
>> com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
>> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
>> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
>> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
>> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
>> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
>> at
>> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <shashank...@gmail.com>
>> wrote:
>>
>>> HI,
>>>
>>> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in
>>> my scala application. Before sink, i was converting my scala datastream to
>>> java stream and sinking in Cassandra. I have created pojo class in scala
>>> liked that :
>>>
>>> @SerialVersionUID(507L)
>>> @Table(keyspace = "neofp", name = "order_detail")
>>> case class OrderFinal(
>>>                        @BeanProperty var order_name: String,
>>>                        @BeanProperty var user: String )extends
>>> Serializable
>>> {
>>>   def this() {
>>>     this("NA", "NA",)
>>>   }
>>> }
>>>
>>> and this was working fine with sink. after upgrading to 1.4.0 it's
>>> giving error "Query must not be null or empty."
>>>
>>> After dig into the CassandraSink code, I have found it's treating it as
>>> case class and running CassandraScalaProductSinkBuilder which
>>> do sanityCheck of query existence.
>>>
>>> So how I can create POJO class in scala so CassandraSink treats it
>>> as CassandraPojoSinkBuilder?
>>>
>>> For workaround now I have downgraded the only connector to 1.3.2
>>>
>>>
>>> Thanks
>>> Shashank
>>>
>>>
>>
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things....
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>
> --
Sent from iPhone 5

Reply via email to