Repository: flink
Updated Branches:
  refs/heads/master 611412c6b -> 02c10d312


Revert "[FLINK-4177] Harden CassandraConnectorITCase"

This reverts commit 62523acbe175cf159fe1b4ab6cf5c0412fc4d232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02c10d31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02c10d31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02c10d31

Branch: refs/heads/master
Commit: 02c10d312371aaad12ed8961cdf96288fe78a983
Parents: 95d640b
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 16 17:59:15 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../flink-connector-cassandra/pom.xml           |  11 +-
 .../connectors/cassandra/CassandraSinkBase.java |  39 +-
 .../cassandra/CassandraConnectorITCase.java     | 374 +++++++++++--------
 .../connectors/cassandra/CassandraService.java  | 118 ------
 .../src/test/resources/cassandra.yaml           |  41 +-
 5 files changed, 232 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml 
b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
index 07cdc09..3a1731c 100644
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
@@ -37,8 +37,8 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <cassandra.version>2.2.7</cassandra.version>
-               <driver.version>3.0.3</driver.version>
+               <cassandra.version>2.2.5</cassandra.version>
+               <driver.version>3.0.0</driver.version>
        </properties>
 
        <build>
@@ -159,13 +159,6 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
-               <!-- we need this dependency for the EmbeddedCassandraService-->
-               <dependency>
-                       <groupId>org.caffinitas.ohc</groupId>
-                       <artifactId>ohc-core</artifactId>
-                       <version>0.4.5</version>
-                       <scope>test</scope>
-               </dependency>
                <dependency>
                        <groupId>org.apache.cassandra</groupId>
                        <artifactId>cassandra-all</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 9c4c430..49b1efa 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,8 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} 
and {@link CassandraTupleSink}.
@@ -42,13 +40,11 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        protected transient Cluster cluster;
        protected transient Session session;
 
-       protected transient final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+       protected transient Throwable exception = null;
        protected transient FutureCallback<V> callback;
 
        private final ClusterBuilder builder;
 
-       protected final AtomicInteger updatesPending = new AtomicInteger();
-
        protected CassandraSinkBase(ClusterBuilder builder) {
                this.builder = builder;
                ClosureCleaner.clean(builder, true);
@@ -59,24 +55,11 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
                this.callback = new FutureCallback<V>() {
                        @Override
                        public void onSuccess(V ignored) {
-                               int pending = updatesPending.decrementAndGet();
-                               if (pending == 0) {
-                                       synchronized (updatesPending) {
-                                               updatesPending.notifyAll();
-                                       }
-                               }
                        }
 
                        @Override
                        public void onFailure(Throwable t) {
-                               int pending = updatesPending.decrementAndGet();
-                               if (pending == 0) {
-                                       synchronized (updatesPending) {
-                                               updatesPending.notifyAll();
-                                       }
-                               }
-                               exception.set(t);
-                               
+                               exception = t;
                                LOG.error("Error while sending value.", t);
                        }
                };
@@ -86,12 +69,10 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
 
        @Override
        public void invoke(IN value) throws Exception {
-               Throwable e = exception.get();
-               if (e != null) {
-                       throw new IOException("Error while sending value.", e);
+               if (exception != null) {
+                       throw new IOException("invoke() failed", exception);
                }
                ListenableFuture<V> result = send(value);
-               updatesPending.incrementAndGet();
                Futures.addCallback(result, callback);
        }
 
@@ -99,14 +80,6 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
 
        @Override
        public void close() {
-               while (updatesPending.get() > 0) {
-                       synchronized (updatesPending) {
-                               try {
-                                       updatesPending.wait();
-                               } catch (InterruptedException e) {
-                               }
-                       }
-               }
                try {
                        if (session != null) {
                                session.close();
@@ -121,9 +94,5 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
                } catch (Exception e) {
                        LOG.error("Error while closing cluster.", e);
                }
-               Throwable e = exception.get();
-               if (e != null) {
-                       LOG.error("Error while sending value.", e);
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 258ef52..2bb6fd1 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -20,138 +20,192 @@ package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.PoolingOptions;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SocketOptions;
+
+import org.apache.cassandra.service.CassandraDaemon;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.TestEnvironment;
 
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
+import java.util.Scanner;
 import java.util.UUID;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("serial")
-public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, 
CassandraConnectorITCase.TestCassandraTupleWriteAheadSink<Tuple3<String, 
Integer, Integer>>> {
+public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, 
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
-
-       private static final String TABLE_NAME_PREFIX = "flink_";
-       private static final String TABLE_NAME_VARIABLE = "$TABLE";
-       private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE 
flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
-       private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT 
EXISTS flink." + TABLE_NAME_VARIABLE + " (id text PRIMARY KEY, counter int, 
batch_id int);";
-       private static final String INSERT_DATA_QUERY = "INSERT INTO flink." + 
TABLE_NAME_VARIABLE + " (id, counter, batch_id) VALUES (?, ?, ?)";
-       private static final String SELECT_DATA_QUERY = "SELECT * FROM flink." 
+ TABLE_NAME_VARIABLE + ';';
+       private static File tmpDir;
 
        private static final boolean EMBEDDED = true;
 
-       private static final ClusterBuilder builder = new ClusterBuilder() {
+       private static EmbeddedCassandraService cassandra;
+
+       private static ClusterBuilder builder = new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                        return builder
                                .addContactPoint("127.0.0.1")
-                               .withSocketOptions(new 
SocketOptions().setConnectTimeoutMillis(30000))
                                .withQueryOptions(new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
-                               .withPoolingOptions(new 
PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 32, 
32).setMaxRequestsPerConnection(HostDistance.LOCAL, 
2048).setPoolTimeoutMillis(15000))
                                .withoutJMXReporting()
                                .withoutMetrics().build();
                }
        };
 
-       private static final List<Tuple3<String, Integer, Integer>> collection 
= new ArrayList<>();
-
-       private static CassandraService cassandra;
        private static Cluster cluster;
        private static Session session;
 
-       private static final Random random = new Random();
-       private int tableID;
+       private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE 
flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
+       private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE 
flink;";
+       private static final String CREATE_TABLE_QUERY = "CREATE TABLE 
flink.test (id text PRIMARY KEY, counter int, batch_id int);";
+       private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;";
+       private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test 
(id, counter, batch_id) VALUES (?, ?, ?)";
+       private static final String SELECT_DATA_QUERY = "SELECT * FROM 
flink.test;";
 
-       @BeforeClass
-       public static void generateCollection() {
+       private static final ArrayList<Tuple3<String, Integer, Integer>> 
collection = new ArrayList<>(20);
+
+       static {
                for (int i = 0; i < 20; i++) {
                        collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
                }
        }
 
+       private static class EmbeddedCassandraService {
+               CassandraDaemon cassandraDaemon;
+
+               public void start() throws IOException {
+                       this.cassandraDaemon = new CassandraDaemon();
+                       this.cassandraDaemon.init(null);
+                       this.cassandraDaemon.start();
+               }
+
+               public void stop() {
+                       this.cassandraDaemon.stop();
+               }
+       }
+
+       private static LocalFlinkMiniCluster flinkCluster;
+
+       // 
------------------------------------------------------------------------
+       //  Cluster Setup (Cassandra & Flink)
+       // 
------------------------------------------------------------------------
+
        @BeforeClass
        public static void startCassandra() throws IOException {
 
                // check if we should run this test, current Cassandra version 
requires Java >= 1.8
-               CommonTestUtils.assumeJava8();
-
-               try {
-                       cassandra = new CassandraService();
-               } catch (Exception e) {
-                       LOG.error("Failed to instantiate cassandra service.", 
e);
-                       Assert.fail("Failed to instantiate cassandra service.");
+               org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
+
+               // generate temporary files
+               tmpDir = CommonTestUtils.createTempDirectory();
+               ClassLoader classLoader = 
CassandraConnectorITCase.class.getClassLoader();
+               File file = new 
File(classLoader.getResource("cassandra.yaml").getFile());
+               File tmp = new File(tmpDir.getAbsolutePath() + File.separator + 
"cassandra.yaml");
+               
+               assertTrue(tmp.createNewFile());
+
+               try (
+                       BufferedWriter b = new BufferedWriter(new 
FileWriter(tmp));
+
+                       //copy cassandra.yaml; inject absolute paths into 
cassandra.yaml
+                       Scanner scanner = new Scanner(file);
+               ) {
+                       while (scanner.hasNextLine()) {
+                               String line = scanner.nextLine();
+                               line = line.replace("$PATH", "'" + 
tmp.getParentFile());
+                               b.write(line + "\n");
+                               b.flush();
+                       }
                }
 
+
+               // Tell cassandra where the configuration files are.
+               // Use the test configuration file.
+               System.setProperty("cassandra.config", 
tmp.getAbsoluteFile().toURI().toString());
+
                if (EMBEDDED) {
-                       cassandra.startProcess();
+                       cassandra = new EmbeddedCassandraService();
+                       cassandra.start();
                }
 
-               long start = System.currentTimeMillis();
-               long deadline = start + 1000 * 30;
-               while (true) {
-                       try {
-                               cluster = builder.getCluster();
-                               session = cluster.connect();
-                               break;
-                       } catch (Exception e) {
-                               if (System.currentTimeMillis() > deadline) {
-                                       throw e;
-                               }
-                               try {
-                                       Thread.sleep(1000);
-                               } catch (InterruptedException ignored) {
-                               }
-                       }
+               try {
+                       Thread.sleep(1000 * 10);
+               } catch (InterruptedException e) { //give cassandra a few 
seconds to start up
                }
-               LOG.debug("Connection established after {}ms.", 
System.currentTimeMillis() - start);
+
+               cluster = builder.getCluster();
+               session = cluster.connect();
 
                session.execute(CREATE_KEYSPACE_QUERY);
-               session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + "initial"));
+               session.execute(CREATE_TABLE_QUERY);
        }
 
-       @Before
-       public void createTable() {
-               tableID = random.nextInt(Integer.MAX_VALUE);
-               session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + tableID));
+       @BeforeClass
+       public static void startFlink() throws Exception {
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
+
+               flinkCluster = new LocalFlinkMiniCluster(config);
+               flinkCluster.start();
+       }
+
+       @AfterClass
+       public static void stopFlink() {
+               if (flinkCluster != null) {
+                       flinkCluster.stop();
+                       flinkCluster = null;
+               }
        }
 
        @AfterClass
        public static void closeCassandra() {
                if (session != null) {
+                       session.executeAsync(DROP_KEYSPACE_QUERY);
                        session.close();
                }
 
@@ -159,11 +213,29 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                        cluster.close();
                }
 
-               if (EMBEDDED) {
-                       if (cassandra != null) {
-                               cassandra.destroy();
-                       }
+               if (cassandra != null) {
+                       cassandra.stop();
                }
+
+               if (tmpDir != null) {
+                       //noinspection ResultOfMethodCallIgnored
+                       tmpDir.delete();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test preparation & cleanup
+       // 
------------------------------------------------------------------------
+
+       @Before
+       public void initializeExecutionEnvironment() {
+               TestStreamEnvironment.setAsContext(flinkCluster, 4);
+               new TestEnvironment(flinkCluster, 4, false).setAsContext();
+       }
+
+       @After
+       public void deleteSchema() throws Exception {
+               session.executeAsync(CLEAR_TABLE_QUERY);
        }
 
        // 
------------------------------------------------------------------------
@@ -171,9 +243,9 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        // 
------------------------------------------------------------------------
 
        @Override
-       protected TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> createSink() throws Exception {
-               return new TestCassandraTupleWriteAheadSink<>(
-                       TABLE_NAME_PREFIX + tableID,
+       protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> createSink() throws Exception {
+               return new CassandraTupleWriteAheadSink<>(
+                       INSERT_DATA_QUERY,
                        TypeExtractor.getForObject(new Tuple3<>("", 0, 
0)).createSerializer(new ExecutionConfig()),
                        builder,
                        new CassandraCommitter(builder));
@@ -192,42 +264,43 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        @Override
        protected void verifyResultsIdealCircumstances(
                OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> sink) {
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
-               ResultSet result = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
                        list.add(x);
                }
 
                for (Row s : result) {
-                       list.remove(Integer.valueOf(s.getInt("counter")));
+                       list.remove(new Integer(s.getInt("counter")));
                }
-               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list, list.isEmpty());
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
        }
 
        @Override
        protected void verifyResultsDataPersistenceUponMissedNotify(
                OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> sink) {
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
-               ResultSet result = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 60; x++) {
                        list.add(x);
                }
 
                for (Row s : result) {
-                       list.remove(Integer.valueOf(s.getInt("counter")));
+                       list.remove(new Integer(s.getInt("counter")));
                }
-               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list, list.isEmpty());
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
        }
 
        @Override
        protected void verifyResultsDataDiscardingUponRestore(
                OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
-               TestCassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>> sink) {
-               ResultSet result = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, sink.tableName));
+               CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
+
+               ResultSet result = session.execute(SELECT_DATA_QUERY);
                ArrayList<Integer> list = new ArrayList<>();
                for (int x = 1; x <= 20; x++) {
                        list.add(x);
@@ -237,24 +310,23 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                }
 
                for (Row s : result) {
-                       list.remove(Integer.valueOf(s.getInt("counter")));
+                       list.remove(new Integer(s.getInt("counter")));
                }
-               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list, list.isEmpty());
+               Assert.assertTrue("The following ID's were not found in the 
ResultSet: " + list.toString(), list.isEmpty());
        }
 
        @Test
        public void testCassandraCommitter() throws Exception {
-               String jobID = new JobID().toString();
-               CassandraCommitter cc1 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
-               cc1.setJobId(jobID);
+               CassandraCommitter cc1 = new CassandraCommitter(builder);
+               cc1.setJobId("job");
                cc1.setOperatorId("operator");
 
-               CassandraCommitter cc2 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
-               cc2.setJobId(jobID);
+               CassandraCommitter cc2 = new CassandraCommitter(builder);
+               cc2.setJobId("job");
                cc2.setOperatorId("operator");
 
-               CassandraCommitter cc3 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
-               cc3.setJobId(jobID);
+               CassandraCommitter cc3 = new CassandraCommitter(builder);
+               cc3.setJobId("job");
                cc3.setOperatorId("operator1");
 
                cc1.createResource();
@@ -280,17 +352,17 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                cc2.close();
                cc3.close();
 
-               CassandraCommitter cc4 = new CassandraCommitter(builder, 
"flink_auxiliary_cc");
-               cc4.setJobId(jobID);
-               cc4.setOperatorId("operator");
+               cc1 = new CassandraCommitter(builder);
+               cc1.setJobId("job");
+               cc1.setOperatorId("operator");
 
-               cc4.open();
+               cc1.open();
 
                //verify that checkpoint data is not destroyed within 
open/close and not reliant on internally cached data
-               Assert.assertTrue(cc4.isCheckpointCommitted(0, 1));
-               Assert.assertFalse(cc4.isCheckpointCommitted(0, 2));
+               Assert.assertTrue(cc1.isCheckpointCommitted(0, 1));
+               Assert.assertFalse(cc1.isCheckpointCommitted(0, 2));
 
-               cc4.close();
+               cc1.close();
        }
 
        // 
------------------------------------------------------------------------
@@ -299,94 +371,70 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Test
        public void testCassandraTupleAtLeastOnceSink() throws Exception {
-               CassandraTupleSink<Tuple3<String, Integer, Integer>> sink = new 
CassandraTupleSink<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + tableID), builder);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
 
-               sink.open(new Configuration());
+               DataStream<Tuple3<String, Integer, Integer>> source = 
env.fromCollection(collection);
+               source.addSink(new CassandraTupleSink<Tuple3<String, Integer, 
Integer>>(INSERT_DATA_QUERY, builder));
 
-               for (Tuple3<String, Integer, Integer> value : collection) {
-                       sink.send(value);
-               }
-
-               sink.close();
-
-               synchronized (sink.updatesPending) {
-                       if (sink.updatesPending.get() != 0) {
-                               sink.updatesPending.wait();
-                       }
-               }
+               env.execute();
 
-               ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + tableID));
-               try {
-                       Assert.assertEquals(20, rs.all().size());
-               } catch (Throwable e) {
-                       LOG.error("test failed.", e);
-               }
+               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               Assert.assertEquals(20, rs.all().size());
        }
 
        @Test
        public void testCassandraPojoAtLeastOnceSink() throws Exception {
-               session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test"));
-
-               CassandraPojoSink<Pojo> sink = new 
CassandraPojoSink<>(Pojo.class, builder);
-
-               sink.open(new Configuration());
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               DataStreamSource<Pojo> source = env
+                       .addSource(new SourceFunction<Pojo>() {
+
+                               private boolean running = true;
+                               private volatile int cnt = 0;
+
+                               @Override
+                               public void run(SourceContext<Pojo> ctx) throws 
Exception {
+                                       while (running) {
+                                               ctx.collect(new 
Pojo(UUID.randomUUID().toString(), cnt, 0));
+                                               cnt++;
+                                               if (cnt == 20) {
+                                                       cancel();
+                                               }
+                                       }
+                               }
 
-               for (int x = 0; x < 20; x++) {
-                       sink.send(new Pojo(UUID.randomUUID().toString(), x, 0));
-               }
+                               @Override
+                               public void cancel() {
+                                       running = false;
+                               }
+                       });
 
-               sink.close();
+               source.addSink(new CassandraPojoSink<>(Pojo.class, builder));
 
-               synchronized (sink.updatesPending) {
-                       while (sink.updatesPending.get() != 0) {
-                               sink.updatesPending.wait();
-                       }
-               }
+               env.execute();
 
-               ResultSet rs = 
session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
-               try {
-                       Assert.assertEquals(20, rs.all().size());
-               } catch (Throwable e) {
-                       LOG.error("test failed.", e);
-               }
+               ResultSet rs = session.execute(SELECT_DATA_QUERY);
+               Assert.assertEquals(20, rs.all().size());
        }
 
        @Test
        public void testCassandraBatchFormats() throws Exception {
-               OutputFormat<Tuple3<String, Integer, Integer>> sink = new 
CassandraOutputFormat<>(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + tableID), builder);
-               sink.configure(new Configuration());
-               sink.open(0, 1);
-
-               for (Tuple3<String, Integer, Integer> value : collection) {
-                       sink.writeRecord(value);
-               }
-
-               sink.close();
-
-               InputFormat<Tuple3<String, Integer, Integer>, InputSplit> 
source = new 
CassandraInputFormat<>(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + tableID), builder);
-               source.configure(new Configuration());
-               source.open(null);
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
 
-               List<Tuple3<String, Integer, Integer>> result = new 
ArrayList<>();
+               DataSet<Tuple3<String, Integer, Integer>> dataSet = 
env.fromCollection(collection);
+               dataSet.output(new CassandraOutputFormat<Tuple3<String, 
Integer, Integer>>(INSERT_DATA_QUERY, builder));
 
-               while (!source.reachedEnd()) {
-                       result.add(source.nextRecord(new Tuple3<String, 
Integer, Integer>()));
-               }
+               env.execute("Write data");
 
-               source.close();
-               try {
-                       Assert.assertEquals(20, result.size());
-               } catch (Throwable e) {
-                       LOG.error("test failed.", e);
-               }
-       }
+               DataSet<Tuple3<String, Integer, Integer>> inputDS = 
env.createInput(
+                       new CassandraInputFormat<Tuple3<String, Integer, 
Integer>>(SELECT_DATA_QUERY, builder),
+                       TypeInformation.of(new TypeHint<Tuple3<String, Integer, 
Integer>>(){}));
 
-       protected static class TestCassandraTupleWriteAheadSink<IN extends 
Tuple> extends CassandraTupleWriteAheadSink<IN> {
-               private final String tableName;
 
-               private TestCassandraTupleWriteAheadSink(String tableName, 
TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter 
committer) throws Exception {
-                       super(INSERT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, 
tableName), serializer, builder, committer);
-                       this.tableName = tableName;
-               }
+               long count = inputDS.count();
+               Assert.assertEquals(count, 20L);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
deleted file mode 100644
index 2e649e4..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.TestJvmProcess;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Scanner;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertTrue;
-
-public class CassandraService extends TestJvmProcess {
-       private File tmpDir;
-       private File tmpCassandraYaml;
-
-       public CassandraService() throws Exception {
-               createCassandraYaml();
-               setJVMMemory(512);
-       }
-
-       private void createCassandraYaml() throws IOException {
-               // generate temporary files
-               tmpDir = CommonTestUtils.createTempDirectory();
-               ClassLoader classLoader = 
CassandraConnectorITCase.class.getClassLoader();
-               File file = new 
File(classLoader.getResource("cassandra.yaml").getFile());
-               tmpCassandraYaml = new File(tmpDir.getAbsolutePath() + 
File.separator + "cassandra.yaml");
-
-               assertTrue(tmpCassandraYaml.createNewFile());
-               BufferedWriter b = new BufferedWriter(new 
FileWriter(tmpCassandraYaml));
-
-               //copy cassandra.yaml; inject absolute paths into cassandra.yaml
-               Scanner scanner = new Scanner(file);
-               while (scanner.hasNextLine()) {
-                       String line = scanner.nextLine();
-                       line = line.replace("$PATH", "'" + 
tmpCassandraYaml.getParentFile());
-                       b.write(line + "\n");
-                       b.flush();
-               }
-               scanner.close();
-       }
-
-       @Override
-       public String getName() {
-               return "CassandraService";
-       }
-
-       @Override
-       public String[] getJvmArgs() {
-               return new String[]{
-                       tmpCassandraYaml.toURI().toString(),
-                       // these options were taken directly from the 
jvm.options file in the cassandra repo
-                       "-XX:+UseThreadPriorities",
-                       "-Xss256k",
-                       "-XX:+AlwaysPreTouch",
-                       "-XX:+UseTLAB",
-                       "-XX:+ResizeTLAB",
-                       "-XX:+UseNUMA",
-                       "-XX:+PerfDisableSharedMem",
-                       "-XX:+UseParNewGC",
-                       "-XX:+UseConcMarkSweepGC",
-                       "-XX:+CMSParallelRemarkEnabled",
-                       "-XX:SurvivorRatio=8",
-                       "-XX:MaxTenuringThreshold=1",
-                       "-XX:CMSInitiatingOccupancyFraction=75",
-                       "-XX:+UseCMSInitiatingOccupancyOnly",
-                       "-XX:CMSWaitDuration=10000",
-                       "-XX:+CMSParallelInitialMarkEnabled",
-                       "-XX:+CMSEdenChunksRecordAlways",
-                       "-XX:+CMSClassUnloadingEnabled",};
-       }
-
-       @Override
-       public String getEntryPointClassName() {
-               return CassandraServiceEntryPoint.class.getName();
-       }
-
-       public static class CassandraServiceEntryPoint {
-               public static void main(String[] args) throws 
InterruptedException {
-                       final CassandraDaemon cassandraDaemon = new 
CassandraDaemon();
-
-                       System.setProperty("cassandra.config", args[0]);
-
-                       cassandraDaemon.activate();
-
-                       Runtime.getRuntime().addShutdownHook(new Thread() {
-                               @Override
-                               public void run() {
-                                       cassandraDaemon.deactivate();
-                               }
-                       });
-
-                       // Run forever
-                       new CountDownLatch(1).await();
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02c10d31/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
index 77ee0ac..0594ea3 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -15,40 +15,29 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-auto_snapshot: false
-
 cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 10000
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
 commitlog_directory: $PATH/commit'
-commitlog_sync: periodic
-commitlog_sync_period_in_ms: 5000
-
 data_file_directories:
     - $PATH/data'
-disk_access_mode: mmap
-
-endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
-
-listen_address: '127.0.0.1'
-
-memtable_allocation_type: offheap_objects
-
-native_transport_port: 9042
-
-partitioner: org.apache.cassandra.dht.Murmur3Partitioner
-
-read_request_timeout_in_ms: 15000
-request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
-request_scheduler_id: keyspace
-rpc_port: 9170
-
 saved_caches_directory: $PATH/cache'
+listen_address: '127.0.0.1'
 seed_provider:
     - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider'
       parameters:
           - seeds: '127.0.0.1'
+native_transport_port: 9042
+
+concurrent_reads: 8
+concurrent_writes: 8
+
+auto_bootstrap: false
+auto_snapshot: false
+
 start_rpc: false
 start_native_transport: true
-storage_port: 7010
-
-write_request_timeout_in_ms: 15000
+native_transport_max_threads: 8

Reply via email to