[FLINK-4290] [Cassandra Connector] Skip CassandraConnectorTest on Java 7 builds
Cassandra needs Java 8 to run reliably. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7768bb2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7768bb2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7768bb2d Branch: refs/heads/release-1.1 Commit: 7768bb2defe48afb6e77c072e97526856538f6e9 Parents: 34eefa3 Author: Stephan Ewen <se...@apache.org> Authored: Mon Aug 1 16:55:02 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue Aug 2 20:24:37 2016 +0200 ---------------------------------------------------------------------- .../runtime/testutils/CommonTestUtils.java | 9 +- .../cassandra/CassandraConnectorTest.java | 92 ++++++++++++++------ 2 files changed, 67 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7768bb2d/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index 6bd8b34..59c37b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -51,7 +51,7 @@ public class CommonTestUtils { try { Thread.sleep(remaining); } - catch (InterruptedException e) {} + catch (InterruptedException ignored) {} now = System.currentTimeMillis(); } @@ -137,8 +137,7 @@ public class CommonTestUtils { } public static void printLog4jDebugConfig(File file) throws IOException { - FileWriter fw = new FileWriter(file); - try { + try (FileWriter fw = new FileWriter(file)) { PrintWriter writer = new PrintWriter(fw); writer.println("log4j.rootLogger=DEBUG, console"); @@ -152,9 +151,6 @@ public class CommonTestUtils { writer.flush(); writer.close(); } - finally { - fw.close(); - } } public static File createTempDirectory() throws IOException { @@ -165,7 +161,6 @@ public class CommonTestUtils { if (!dir.exists() && dir.mkdirs()) { return dir; } - System.err.println("Could not use temporary directory " + dir.getAbsolutePath()); } throw new IOException("Could not create temporary file directory"); http://git-wip-us.apache.org/repos/asf/flink/blob/7768bb2d/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java index 8d0c02e..2018255 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorTest.java @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.connectors.cassandra; import com.datastax.driver.core.Cluster; @@ -23,9 +24,12 @@ import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; + import org.apache.cassandra.service.CassandraDaemon; + import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; @@ -42,6 +46,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; + import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -49,10 +54,12 @@ import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.internal.AssumptionViolatedException; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; + import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,16 +71,21 @@ import java.util.ArrayList; import java.util.Scanner; import java.util.UUID; +import static org.junit.Assert.*; + +@SuppressWarnings("serial") @RunWith(PowerMockRunner.class) @PrepareForTest(ResultPartitionWriter.class) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class); private static File tmpDir; private static final boolean EMBEDDED = true; + private static EmbeddedCassandraService cassandra; - private transient static ClusterBuilder builder = new ClusterBuilder() { + + private static ClusterBuilder builder = new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder @@ -83,6 +95,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String .withoutMetrics().build(); } }; + private static Cluster cluster; private static Session session; @@ -97,7 +110,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String static { for (int i = 0; i < 20; i++) { - collection.add(new Tuple3<>("" + UUID.randomUUID(), i, 0)); + collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); } } @@ -115,15 +128,36 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String } } - //=====Setup======================================================================================================== + // ------------------------------------------------------------------------ + // Cassandra Cluster Setup + // ------------------------------------------------------------------------ + @BeforeClass public static void startCassandra() throws IOException { - //generate temporary files + + // check if we should run this test, current Cassandra version requires Java >= 1.8 + try { + String javaVersionString = System.getProperty("java.runtime.version").substring(0, 3); + float javaVersion = Float.parseFloat(javaVersionString); + Assume.assumeTrue(javaVersion >= 1.8f); + } + catch (AssumptionViolatedException e) { + System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+"); + throw e; + } + catch (Exception e) { + LOG.error("Cannot determine Java version", e); + e.printStackTrace(); + fail("Cannot determine Java version"); + } + + // generate temporary files tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraTupleWriteAheadSink.class.getClassLoader(); + ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader(); File file = new File(classLoader.getResource("cassandra.yaml").getFile()); File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - tmp.createNewFile(); + + assertTrue(tmp.createNewFile()); BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); //copy cassandra.yaml; inject absolute paths into cassandra.yaml @@ -139,7 +173,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String // Tell cassandra where the configuration files are. // Use the test configuration file. - System.setProperty("cassandra.config", "file:" + File.separator + File.separator + File.separator + tmp.getAbsolutePath()); + System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); if (EMBEDDED) { cassandra = new EmbeddedCassandraService(); @@ -160,13 +194,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String @Before public void checkIfIgnore() { - String runtime = System.getProperty("java.runtime.name"); - String version = System.getProperty("java.runtime.version"); - LOG.info("Running tests on runtime: '{}', version: '{}'", runtime, version); - // The tests are failing on Oracle JDK 7 on Travis due to garbage collection issues. - // Oracle JDK identifies itself as "Java(TM) SE Runtime Environment" - // OpenJDK is "OpenJDK Runtime Environment" - Assume.assumeFalse(runtime.startsWith("Java") && version.startsWith("1.7")); + } @After @@ -176,13 +204,23 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String @AfterClass public static void closeCassandra() { - session.executeAsync(DROP_KEYSPACE_QUERY); - session.close(); - cluster.close(); - if (EMBEDDED) { + if (session != null) { + session.executeAsync(DROP_KEYSPACE_QUERY); + session.close(); + } + + if (cluster != null) { + cluster.close(); + } + + if (cassandra != null) { cassandra.stop(); } - tmpDir.delete(); + + if (tmpDir != null) { + //noinspection ResultOfMethodCallIgnored + tmpDir.delete(); + } } //=====Exactly-Once================================================================================================= @@ -202,7 +240,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String @Override protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) { - return new Tuple3<>("" + UUID.randomUUID(), counter, checkpointID); + return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID); } @Override @@ -379,7 +417,7 @@ public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String DataSet<Tuple3<String, Integer, Integer>> inputDS = env.createInput( new CassandraInputFormat<Tuple3<String, Integer, Integer>>(SELECT_DATA_QUERY, builder), - new TupleTypeInfo(Tuple3.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); + TypeInformation.of(new TypeHint<Tuple3<String, Integer, Integer>>(){})); long count = inputDS.count();