[
https://issues.apache.org/jira/browse/FLINK-2695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14904642#comment-14904642
]
Stephan Ewen commented on FLINK-2695:
-------------------------------------
This particular error occurred because the a leader broker for the topic
partition changed mid execution.
Usually, Flink's recovery catches that, but in that case we test the Kafka
producer (which has no exactly once guarantees) and hence deactivate the
recovery.
Fixed by making a full restart of the program in case of an Exception that
indicates that the leader changed.
> KafkaITCase.testConcurrentProducerConsumerTopology failed on Travis
> -------------------------------------------------------------------
>
> Key: FLINK-2695
> URL: https://issues.apache.org/jira/browse/FLINK-2695
> Project: Flink
> Issue Type: Bug
> Reporter: Till Rohrmann
> Priority: Critical
> Labels: test-stability
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} failed on Travis
> with
> {code}
> -------------------------------------------------------
> T E S T S
> -------------------------------------------------------
> Running org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> Running org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 14.296 sec -
> in org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase
> 09/16/2015 17:19:36 Job execution switched to status RUNNING.
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to
> RUNNING
> 09/16/2015 17:19:36 Source: Custom Source -> Sink: Unnamed(1/1) switched to
> FINISHED
> 09/16/2015 17:19:36 Job execution switched to status FINISHED.
> 09/16/2015 17:19:36 Job execution switched to status RUNNING.
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched
> to SCHEDULED
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched
> to DEPLOYING
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched
> to RUNNING
> 09/16/2015 17:19:36 Source: Custom Source -> Map -> Flat Map(1/1) switched
> to FAILED
> java.lang.Exception: Could not forward element to next operator
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316)
> at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
> at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:92)
> at
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:88)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:449)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:316)
> at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
> at
> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329)
> ... 6 more
> Caused by:
> org.apache.flink.streaming.connectors.kafka.testutils.SuccessException
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:896)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$7.flatMap(KafkaConsumerTestBase.java:876)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingChainingOutput.collect(OutputHandler.java:329)
> ... 11 more
> 09/16/2015 17:19:36 Job execution switched to status FAILING.
> 09/16/2015 17:19:36 Job execution switched to status FAILED.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.run(Client.java:381)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:124)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:95)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:283)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology(KafkaConsumerTestBase.java:334)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testConcurrentProducerConsumerTopology(KafkaITCase.java:50)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:418)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:40)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Unable to get last offset for topic
> concurrentProducerConsumerTopic and partitions [FetchPartition {partition=1,
> offset=-915623761776}].
> Exception for partition 1: kafka.common.NotLeaderForPartitionException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:171)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Unable to get last offset for topic
> concurrentProducerConsumerTopic and partitions [FetchPartition {partition=1,
> offset=-915623761776}].
> Exception for partition 1: kafka.common.NotLeaderForPartitionException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:524)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370)
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 93.274 sec
> <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.KafkaITCase
> testConcurrentProducerConsumerTopology(org.apache.flink.streaming.connectors.kafka.KafkaITCase)
> Time elapsed: 0.731 sec <<< FAILURE!
> java.lang.AssertionError: Test failed: The program execution failed: Job
> execution failed.
> at org.junit.Assert.fail(Assert.java:88)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:293)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology(KafkaConsumerTestBase.java:334)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testConcurrentProducerConsumerTopology(KafkaITCase.java:50)
> Results :
> Failed tests:
>
> KafkaITCase.testConcurrentProducerConsumerTopology:50->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:334->KafkaTestBase.tryExecute:293
> Test failed: The program execution failed: Job execution failed.
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/80642059/log.txt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)