[
https://issues.apache.org/jira/browse/FLINK-4177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516291#comment-15516291
]
ASF GitHub Bot commented on FLINK-4177:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2484#discussion_r80233334
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
---
@@ -178,89 +166,122 @@ public static void startCassandra() throws
IOException {
}
scanner.close();
-
- // Tell cassandra where the configuration files are.
- // Use the test configuration file.
- System.setProperty("cassandra.config",
tmp.getAbsoluteFile().toURI().toString());
-
if (EMBEDDED) {
- cassandra = new EmbeddedCassandraService();
- cassandra.start();
+ String javaCommand = getJavaCommandPath();
+
+ // create a logging file for the process
+ File tempLogFile = File.createTempFile("testlogconfig",
"properties");
+ CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+ // start the task manager process
+ String[] command = new String[]{
+ javaCommand,
+ "-Dlog.level=DEBUG",
+ "-Dlog4j.configuration=file:" +
tempLogFile.getAbsolutePath(),
+ "-Dcassandra.config=" + tmp.toURI(),
+ // these options were taken directly from the
jvm.options file in the cassandra repo
+ "-XX:+UseThreadPriorities",
+ "-Xss256k",
+ "-XX:+AlwaysPreTouch",
+ "-XX:+UseTLAB",
+ "-XX:+ResizeTLAB",
+ "-XX:+UseNUMA",
+ "-XX:+PerfDisableSharedMem",
+ "-XX:+UseParNewGC",
+ "-XX:+UseConcMarkSweepGC",
+ "-XX:+CMSParallelRemarkEnabled",
+ "-XX:SurvivorRatio=8",
+ "-XX:MaxTenuringThreshold=1",
+ "-XX:CMSInitiatingOccupancyFraction=75",
+ "-XX:+UseCMSInitiatingOccupancyOnly",
+ "-XX:CMSWaitDuration=10000",
+ "-XX:+CMSParallelInitialMarkEnabled",
+ "-XX:+CMSEdenChunksRecordAlways",
+ "-XX:+CMSClassUnloadingEnabled",
+
+ "-classpath", getCurrentClasspath(),
+ EmbeddedCassandraService.class.getName()
+ };
+
+ ProcessBuilder bld = new ProcessBuilder(command);
+ cassandra = bld.start();
+ sw = new StringWriter();
+ new PipeForwarder(cassandra.getErrorStream(), sw);
}
- try {
- Thread.sleep(1000 * 10);
- } catch (InterruptedException e) { //give cassandra a few
seconds to start up
+ int attempt = 0;
+ while (true) {
+ try {
+ attempt++;
+ cluster = builder.getCluster();
+ session = cluster.connect();
+ break;
+ } catch (Exception e) {
+ if (attempt > 30) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ }
+ }
}
-
- cluster = builder.getCluster();
- session = cluster.connect();
+ LOG.debug("Connection established after " + attempt + "
attempts.");
session.execute(CREATE_KEYSPACE_QUERY);
- session.execute(CREATE_TABLE_QUERY);
- }
-
- @BeforeClass
- public static void startFlink() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
4);
+ session.execute(CREATE_TABLE_QUERY.replace("$TABLE",
"flink_initial"));
- flinkCluster = new ForkableFlinkMiniCluster(config);
- flinkCluster.start();
+ try {
+ Thread.sleep(5000);
--- End diff --
the execute() call is synchronous, it's not about making sure that it is
complete. I just found that giving Cassandra a bit of time after creating the
keyspace and the first table resulted in a higher stability.
> CassandraConnectorTest.testCassandraCommitter causing unstable builds
> ---------------------------------------------------------------------
>
> Key: FLINK-4177
> URL: https://issues.apache.org/jira/browse/FLINK-4177
> Project: Flink
> Issue Type: Bug
> Components: Cassandra Connector, Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Robert Metzger
> Labels: test-stability
>
> This build: https://api.travis-ci.org/jobs/143272982/log.txt?deansi=true
> failed with
> {code}
> 07/08/2016 09:59:12 Job execution switched to status FINISHED.
> Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 146.646 sec
> <<< FAILURE! - in
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest
> testCassandraCommitter(org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest)
> Time elapsed: 9.057 sec <<< ERROR!
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout
> during write query at consistency LOCAL_SERIAL (1 replica were required but
> only 0 acknowledged the write)
> at
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:73)
> at
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:26)
> at
> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
> at
> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
> at
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63)
> at
> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraCommitter.open(CassandraCommitter.java:103)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraConnectorTest.testCassandraCommitter(CassandraConnectorTest.java:284)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException:
> Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica
> were required but only 0 acknowledged the write)
> at
> com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100)
> at
> com.datastax.driver.core.Responses$Error.asException(Responses.java:122)
> at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
> at
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException:
> Cassandra timeout during write query at consistency LOCAL_SERIAL (1 replica
> were required but only 0 acknowledged the write)
> at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59)
> at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
> at
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
> at
> com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:318)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:304)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
> at
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)