The tests use a ZooKeeper mini cluster and multiple Kafka MiniClusters.

It appears that these are not very stable in our test setup. Let's see what
we can do to improve reliability there.

1) As a first step, I would suggest to reduce the number of concurrent
tests to one for this project, as it will prevent that we have multiple
concurrent mini clusters competing for compute resources.

2) The method "SimpleConsumerThread.getLastOffset()" Should probably
re-retrieve the leader, or we should allow the program more recovery
retries...

Greetings,
Stephan


On Wed, Sep 23, 2015 at 4:04 AM, Li, Chengxiang <chengxiang...@intel.com>
wrote:

> Found more KafkaITCase failure at:
> https://travis-ci.org/apache/flink/jobs/81592146
>
> Failed tests:
>
> KafkaITCase.testConcurrentProducerConsumerTopology:50->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:334->KafkaTestBase.tryExecute:313
> Test failed: The program execution failed: Job execution failed.
> Tests in error:
>
> KafkaITCase.testCancelingEmptyTopic:57->KafkaConsumerTestBase.runCancelingOnEmptyInputTest:594
> »
>
> KafkaITCase.testCancelingFullTopic:62->KafkaConsumerTestBase.runCancelingOnFullInputTest:529
> »
>
> KafkaITCase.testMultipleSourcesOnePartition:89->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest:450
> » ProgramInvocation
>
> KafkaITCase.testOffsetInZookeeper:45->KafkaConsumerTestBase.runOffsetInZookeeperValidationTest:205->KafkaConsumerTestBase.writeSequence:938
> » ProgramInvocation
>
> KafkaITCase.testOneToOneSources:79->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:356
> » ProgramInvocation
>
> It happens only on the test mode of JDK: oraclejdk8
> PROFILE="-Dhadoop.version=2.5.0 -Dmaven.javadoc.skip=true".
>
> Thanks
> Chengxiang
>
> -----Original Message-----
> From: Till Rohrmann (JIRA) [mailto:j...@apache.org]
> Sent: Thursday, September 17, 2015 11:02 PM
> To: dev@flink.apache.org
> Subject: [jira] [Created] (FLINK-2695)
> KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis
>
> Till Rohrmann created FLINK-2695:
> ------------------------------------
>
>              Summary: KafkaITCase.testConcurrentProducerConsumerTopology
> failed on Travis
>                  Key: FLINK-2695
>                  URL: https://issues.apache.org/jira/browse/FLINK-2695
>              Project: Flink
>           Issue Type: Bug
>             Reporter: Till Rohrmann
>             Priority: Critical
>
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} failed on
> Travis with
>
> {code}
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> Running org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 14.296 sec
> - in org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> 09/16/2015 17:19:36     Job execution switched to status RUNNING.
> 09/16/2015 17:19:36     Source: Custom Source -> Sink: Unnamed(1/1)
> switched to SCHEDULED
> 09/16/2015 17:19:36     Source: Custom Source -> Sink: Unnamed(1/1)
> switched to DEPLOYING
> 09/16/2015 17:19:36     Source: Custom Source -> Sink: Unnamed(1/1)
> switched to RUNNING
> 09/16/2015 17:19:36     Source: Custom Source -> Sink: Unnamed(1/1)
> switched to FINISHED
> 09/16/2015 17:19:36     Job execution switched to status FINISHED.
> 09/16/2015 17:19:36     Job execution switched to status RUNNING.
> 09/16/2015 17:19:36     Source: Custom Source -> Map -> Flat Map(1/1)
> switched to SCHEDULED
> 09/16/2015 17:19:36     Source: Custom Source -> Map -> Flat Map(1/1)
> switched to DEPLOYING
> 09/16/2015 17:19:36     Source: Custom Source -> Map -> Flat Map(1/1)
> switched to RUNNING
> 09/16/2015 17:19:36     Source: Custom Source -> Map -> Flat Map(1/1)
> switched to FAILED
> java.lang.Exception: Could not forward element to next operator
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332)
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316)
>         at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
>         at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:92)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:88)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:449)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332)
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316)
>         at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
>         at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329)
>         ... 6 more
> Caused by:
> org.apache.flink.streaming.connectors.kafka.testutils.SuccessException
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:896)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:876)
>         at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>         at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329)
>         ... 11 more
>
> 09/16/2015 17:19:36     Job execution switched to status FAILING.
> 09/16/2015 17:19:36     Job execution switched to status FAILED.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at org.apache.flink.client.program.Client.run(Client.java:381)
>         at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:124)
>         at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:95)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:283)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology(KafkaConsumerTestBase.java:334)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testConcurrentProducerConsumerTopology(KafkaITCase.java:50)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>         at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>         at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>         at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>         at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>         at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>         at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>         at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>         at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>         at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>         at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>         at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>         at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:418)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285)
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:40)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Unable to get last offset for topic
> concurrentProducerConsumerTopic and partitions [FetchPartition
> {partition=1, offset=-915623761776}].
> Exception for partition 1: kafka.common.NotLeaderForPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>         at java.lang.Class.newInstance(Class.java:379)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Unable to get last offset for topic
> concurrentProducerConsumerTopic and partitions [FetchPartition
> {partition=1, offset=-915623761776}].
> Exception for partition 1: kafka.common.NotLeaderForPartitionException
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>         at java.lang.Class.newInstance(Class.java:379)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
>
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:524)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 93.274
> sec <<< FAILURE! - in
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> testConcurrentProducerConsumerTopology(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
> Time elapsed: 0.731 sec  <<< FAILURE!
> java.lang.AssertionError: Test failed: The program execution failed: Job
> execution failed.
>         at org.junit.Assert.fail(Assert.java:88)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:293)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology(KafkaConsumerTestBase.java:334)
>         at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testConcurrentProducerConsumerTopology(KafkaITCase.java:50)
>
>
> Results :
>
> Failed tests:
>
> KafkaITCase.testConcurrentProducerConsumerTopology:50->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:334->KafkaTestBase.tryExecute:293
> Test failed: The program execution failed: Job execution failed.
> {code}
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/80642059/log.txt
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>

Reply via email to