Revert "[hotfix] [cassandra] Fix CassandraSinkBase serialization issue"

This reverts commit 5fa389014a3ce40534703c8a5731c8a9a955058a.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d640b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d640b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d640b8

Branch: refs/heads/master
Commit: 95d640b832a91b05d6031dbb09086206369666d3
Parents: cf78542
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 16 17:59:01 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/cassandra/CassandraSinkBase.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95d640b8/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 713a286..9c4c430 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        protected transient Cluster cluster;
        protected transient Session session;
 
-       protected transient AtomicReference<Throwable> exception;
+       protected transient final AtomicReference<Throwable> exception = new 
AtomicReference<>();
        protected transient FutureCallback<V> callback;
 
        private final ClusterBuilder builder;
@@ -56,7 +56,6 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
 
        @Override
        public void open(Configuration configuration) {
-               this.exception = new AtomicReference<>();
                this.callback = new FutureCallback<V>() {
                        @Override
                        public void onSuccess(V ignored) {

Reply via email to