[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds

Cassandra needs Java 8 to run reliably.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7768bb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7768bb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7768bb2d

Branch: refs/heads/release-1.1
Commit: 7768bb2defe48afb6e77c072e97526856538f6e9
Parents: 34eefa3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 16:55:02 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Aug 2 20:24:37 2016 +0200

----------------------------------------------------------------------
 .../runtime/testutils/CommonTestUtils.java      |  9 +-
 .../cassandra/CassandraConnectorTest.java       | 92 ++++++++++++++------
 2 files changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7768bb2d/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6bd8b34..59c37b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -51,7 +51,7 @@ public class CommonTestUtils {
                        try {
                                Thread.sleep(remaining);
                        }
-                       catch (InterruptedException e) {}
+                       catch (InterruptedException ignored) {}
                        
                        now = System.currentTimeMillis();
                }
@@ -137,8 +137,7 @@ public class CommonTestUtils {
        }
 
        public static void printLog4jDebugConfig(File file) throws IOException {
-               FileWriter fw = new FileWriter(file);
-               try {
+               try (FileWriter fw = new FileWriter(file)) {
                        PrintWriter writer = new PrintWriter(fw);
 
                        writer.println("log4j.rootLogger=DEBUG, console");
@@ -152,9 +151,6 @@ public class CommonTestUtils {
                        writer.flush();
                        writer.close();
                }
-               finally {
-                       fw.close();
-               }
        }
 
        public static File createTempDirectory() throws IOException {
@@ -165,7 +161,6 @@ public class CommonTestUtils {
                        if (!dir.exists() && dir.mkdirs()) {
                                return dir;
                        }
-                       System.err.println("Could not use temporary directory " 
+ dir.getAbsolutePath());
                }
 
                throw new IOException("Could not create temporary file 
directory");

http://git-wip-us.apache.org/repos/asf/flink/blob/7768bb2d/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
index 8d0c02e..2018255 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.cassandra;
 
 import com.datastax.driver.core.Cluster;
@@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+
 import org.apache.cassandra.service.CassandraDaemon;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -42,6 +46,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -49,10 +54,12 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,16 +71,21 @@ import java.util.ArrayList;
 import java.util.Scanner;
 import java.util.UUID;
 
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, 
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
+
        private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorTest.class);
        private static File tmpDir;
 
        private static final boolean EMBEDDED = true;
+
        private static EmbeddedCassandraService cassandra;
-       private transient static ClusterBuilder builder = new ClusterBuilder() {
+
+       private static ClusterBuilder builder = new ClusterBuilder() {
                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                        return builder
@@ -83,6 +95,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
                                .withoutMetrics().build();
                }
        };
+
        private static Cluster cluster;
        private static Session session;
 
@@ -97,7 +110,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
        static {
                for (int i = 0; i < 20; i++) {
-                       collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 
0));
+                       collection.add(new 
Tuple3<>(UUID.randomUUID().toString(), i, 0));
                }
        }
 
@@ -115,15 +128,36 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
                }
        }
 
-       
//=====Setup========================================================================================================
+       // 
------------------------------------------------------------------------
+       //  Cassandra Cluster Setup
+       // 
------------------------------------------------------------------------
+
        @BeforeClass
        public static void startCassandra() throws IOException {
-               //generate temporary files
+
+               // check if we should run this test, current Cassandra version 
requires Java >= 1.8
+               try {
+                       String javaVersionString = 
System.getProperty("java.runtime.version").substring(0, 3);
+                       float javaVersion = Float.parseFloat(javaVersionString);
+                       Assume.assumeTrue(javaVersion >= 1.8f);
+               }
+               catch (AssumptionViolatedException e) {
+                       System.out.println("Skipping CassandraConnectorTest, 
because the JDK is < Java 8+");
+                       throw e;
+               }
+               catch (Exception e) {
+                       LOG.error("Cannot determine Java version", e);
+                       e.printStackTrace();
+                       fail("Cannot determine Java version");
+               }
+
+               // generate temporary files
                tmpDir = CommonTestUtils.createTempDirectory();
-               ClassLoader classLoader = 
CassandraTupleWriteAheadSink.class.getClassLoader();
+               ClassLoader classLoader = 
CassandraConnectorTest.class.getClassLoader();
                File file = new 
File(classLoader.getResource("cassandra.yaml").getFile());
                File tmp = new File(tmpDir.getAbsolutePath() + File.separator + 
"cassandra.yaml");
-               tmp.createNewFile();
+               
+               assertTrue(tmp.createNewFile());
                BufferedWriter b = new BufferedWriter(new FileWriter(tmp));
 
                //copy cassandra.yaml; inject absolute paths into cassandra.yaml
@@ -139,7 +173,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
                // Tell cassandra where the configuration files are.
                // Use the test configuration file.
-               System.setProperty("cassandra.config", "file:" + File.separator 
+ File.separator + File.separator + tmp.getAbsolutePath());
+               System.setProperty("cassandra.config", 
tmp.getAbsoluteFile().toURI().toString());
 
                if (EMBEDDED) {
                        cassandra = new EmbeddedCassandraService();
@@ -160,13 +194,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
        @Before
        public void checkIfIgnore() {
-               String runtime = System.getProperty("java.runtime.name");
-                               String version = 
System.getProperty("java.runtime.version");
-               LOG.info("Running tests on runtime: '{}', version: '{}'", 
runtime, version);
-               // The tests are failing on Oracle JDK 7 on Travis due to 
garbage collection issues.
-               // Oracle JDK identifies itself as "Java(TM) SE Runtime 
Environment"
-               // OpenJDK is "OpenJDK Runtime Environment"
-               Assume.assumeFalse(runtime.startsWith("Java") && 
version.startsWith("1.7"));
+               
        }
 
        @After
@@ -176,13 +204,23 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
        @AfterClass
        public static void closeCassandra() {
-               session.executeAsync(DROP_KEYSPACE_QUERY);
-               session.close();
-               cluster.close();
-               if (EMBEDDED) {
+               if (session != null) {
+                       session.executeAsync(DROP_KEYSPACE_QUERY);
+                       session.close();
+               }
+
+               if (cluster != null) {
+                       cluster.close();
+               }
+
+               if (cassandra != null) {
                        cassandra.stop();
                }
-               tmpDir.delete();
+
+               if (tmpDir != null) {
+                       //noinspection ResultOfMethodCallIgnored
+                       tmpDir.delete();
+               }
        }
 
        
//=====Exactly-Once=================================================================================================
@@ -202,7 +240,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
        @Override
        protected Tuple3<String, Integer, Integer> generateValue(int counter, 
int checkpointID) {
-               return new Tuple3<>("" + UUID.randomUUID(), counter, 
checkpointID);
+               return new Tuple3<>(UUID.randomUUID().toString(), counter, 
checkpointID);
        }
 
        @Override
@@ -379,7 +417,7 @@ public class CassandraConnectorTest extends 
WriteAheadSinkTestBase<Tuple3<String
 
                DataSet<Tuple3<String, Integer, Integer>> inputDS = 
env.createInput(
                        new CassandraInputFormat<Tuple3<String, Integer, 
Integer>>(SELECT_DATA_QUERY, builder),
-                       new TupleTypeInfo(Tuple3.class, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO));
+                       TypeInformation.of(new TypeHint<Tuple3<String, Integer, 
Integer>>(){}));
 
 
                long count = inputDS.count();

Reply via email to