Thanks for letting us know. The netty issue will be fixed in Flink 1.4.1.

For case classes there is also a dedicated cassandra sink (every case class is a Product):

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraScalaProductSink.java

Regards,
Timo



Am 12/21/17 um 1:39 PM schrieb shashank agarwal:
Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error of no setters or getters found or multiple found. This is the final workaround i found.
On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <shashank...@gmail.com <mailto:shashank...@gmail.com>> wrote:

    I have tried that by creating class with companion static object:

    @SerialVersionUID(507L)
    @Table(keyspace = "neofp", name = "order_detail")
    class OrderFinal(
     @BeanProperty var order_name: String,
     @BeanProperty var user: String )extends Serializable
    {
      def this() {
        this("NA", "NA",)
      }
    }
    object OrderFinal
    {

    }



    When running with 1.4.0 it's giving following error :


    java.lang.NoClassDefFoundError: Could not initialize class
    com.datastax.driver.core.NettyUtil
    at
    com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
    at
    com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at
    
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
    at
    
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
    at
    
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at
    
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at
    org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:745)

    12/19/2017 18:04:33Job execution switched to status FAILING.
    java.lang.NoClassDefFoundError: Could not initialize class
    com.datastax.driver.core.NettyUtil
    at
    com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:96)
    at
    com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at
    
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:88)
    at
    
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:64)
    at
    
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at
    
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at
    org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:745)

    On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal
    <shashank...@gmail.com <mailto:shashank...@gmail.com>> wrote:

        HI,

        I have upgraded flink from 1.3.2 to 1.4.0. I am using
        cassandra sink in my scala application. Before sink, i was
        converting my scala datastream to java stream and sinking in
        Cassandra. I have created pojo class in scala liked that :

        @SerialVersionUID(507L)
        @Table(keyspace = "neofp", name = "order_detail")
        case class OrderFinal(
                               @BeanProperty var order_name: String,
                               @BeanProperty var user: String )extends
        Serializable
        {
          def this() {
            this("NA", "NA",)
          }
        }

        and this was working fine with sink. after upgrading to 1.4.0
        it's giving error "Query must not be null or empty."

        After dig into the CassandraSink code, I have found it's
        treating it as case class and
        running CassandraScalaProductSinkBuilder which do sanityCheck
        of query existence.

        So how I can create POJO class in scala so CassandraSink
        treats it as CassandraPojoSinkBuilder?

        For workaround now I have downgraded the only connector to 1.3.2


        Thanks
        Shashank




-- Thanks Regards

    SHASHANK AGARWAL
     ---  Trying to mobilize the things....




--
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....


Reply via email to