Revert "[hotfix] [cassandra] Fix CassandraSinkBase serialization issue"
This reverts commit 5fa389014a3ce40534703c8a5731c8a9a955058a. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d640b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d640b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d640b8 Branch: refs/heads/master Commit: 95d640b832a91b05d6031dbb09086206369666d3 Parents: cf78542 Author: Stephan Ewen <[email protected]> Authored: Wed Nov 16 17:59:01 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Nov 16 19:08:07 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/connectors/cassandra/CassandraSinkBase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/95d640b8/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 713a286..9c4c430 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { protected transient Cluster cluster; protected transient Session session; - protected transient AtomicReference<Throwable> exception; + protected transient final AtomicReference<Throwable> exception = new AtomicReference<>(); protected transient FutureCallback<V> callback; private final ClusterBuilder builder; @@ -56,7 +56,6 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void open(Configuration configuration) { - this.exception = new AtomicReference<>(); this.callback = new FutureCallback<V>() { @Override public void onSuccess(V ignored) {
