[jira] [Created] (FLINK-17458) TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers
Congxian Qiu(klion26) created FLINK-17458: - Summary: TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers Key: FLINK-17458 URL: https://issues.apache.org/jira/browse/FLINK-17458 Project: Flink Issue Type: Test Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) When verifying the RC of release-1.10.1, found that `TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers` will fail because of Timeout sometime. I run this test locally in IDEA, found the following exception(locally in only encounter 2 in 1000 times) {code:java} java.lang.InterruptedExceptionjava.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at akka.event.LoggingBus$class.akka$event$LoggingBus$$addLogger(Logging.scala:182) at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:117) at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:116) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:116) at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:113) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682) at akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:113) at akka.event.EventStream.startDefaultLoggers(EventStream.scala:22) at akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:662) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:870) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:891) at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96) at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70) at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55) at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125) at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:113) at org.apache.flink.runtime.akka.AkkaUtils$.createLocalActorSystem(AkkaUtils.scala:68) at org.apache.flink.runtime.akka.AkkaUtils.createLocalActorSystem(AkkaUtils.scala) at org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:74) at org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:67) at org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment$Builder.build(TaskSubmissionTestEnvironment.java:349) at org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest.testFailingScheduleOrUpdateConsumers(TaskExecutorSubmissionTest.java:544) at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) org.junit.runners.model.TestTimedOutException: test timed out after 1 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at
[jira] [Commented] (FLINK-16636) TableEnvironmentITCase is crashing on Travis
[ https://issues.apache.org/jira/browse/FLINK-16636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090518#comment-17090518 ] Congxian Qiu(klion26) commented on FLINK-16636: --- another instacne [https://travis-ci.org/github/klion26/flink/jobs/678526505] > TableEnvironmentITCase is crashing on Travis > > > Key: FLINK-16636 > URL: https://issues.apache.org/jira/browse/FLINK-16636 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Jark Wu >Assignee: Caizhi Weng >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Here is the instance and exception stack: > https://api.travis-ci.org/v3/job/663408376/log.txt > But there is not too much helpful information there, maybe a accidental maven > problem. > {code} > 09:55:07.703 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test > (integration-tests) on project flink-table-planner-blink_2.11: There are test > failures. > 09:55:07.703 [ERROR] > 09:55:07.703 [ERROR] Please refer to > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire-reports > for the individual test results. > 09:55:07.703 [ERROR] Please refer to dump files (if any exist) [date].dump, > [date]-jvmRun[N].dump and [date].dumpstream. > 09:55:07.703 [ERROR] ExecutionException The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 09:55:07.703 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target > && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=1 -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire/surefirebooter714252487017838305.jar > > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire > 2020-03-17T09-34-41_826-jvmRun1 surefire4625103637332937565tmp > surefire_43192129054983363633tmp > 09:55:07.703 [ERROR] Error occurred in starting fork, check output in log > 09:55:07.703 [ERROR] Process Exit Code: 137 > 09:55:07.703 [ERROR] Crashed tests: > 09:55:07.703 [ERROR] org.apache.flink.table.api.TableEnvironmentITCase > 09:55:07.703 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 09:55:07.703 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target > && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m > -Dmvn.forkNumber=1 -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire/surefirebooter714252487017838305.jar > > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire > 2020-03-17T09-34-41_826-jvmRun1 surefire4625103637332937565tmp > surefire_43192129054983363633tmp > 09:55:07.703 [ERROR] Error occurred in starting fork, check output in log > 09:55:07.703 [ERROR] Process Exit Code: 137 > 09:55:07.703 [ERROR] Crashed tests: > 09:55:07.703 [ERROR] org.apache.flink.table.api.TableEnvironmentITCase > 09:55:07.703 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 09:55:07.704 [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 09:55:07.704 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 09:55:07.704 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 09:55:07.704 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) > 09:55:07.704 [ERROR] at >
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084492#comment-17084492 ] Congxian Qiu(klion26) commented on FLINK-17170: --- Hi, [~cvasii] from the description, seems the savepoint successfully, and "unfinished" task was blocked by something. Currently, the lifetime of task logic is "trigger savepoint" -> "savepoint complete" -> "savepoint complete" -> "finish task" >From the previous comments you given, seems the stack was waiting for some >lock, could you please check what is it waiting for? or could you please share the whole jstack message about the "unfinished" task. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17043) Putting more information into accounting when failing a job in FailureManager
Congxian Qiu(klion26) created FLINK-17043: - Summary: Putting more information into accounting when failing a job in FailureManager Key: FLINK-17043 URL: https://issues.apache.org/jira/browse/FLINK-17043 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Currently, we only fail the job when we received continues "CHECKPOINT_DECLINED" message, but ignored the "timeout"/"task_failure"/"task_checkpoint_failure"/"finalize_checkpoint_failure" and so on. In my opinion, we should put some checkpoint failure reason above into account when failing a job (not only the "CHECKPOINT_DECLINED" reason" This issue is inspired by a [user mail list|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Making-job-fail-on-Checkpoint-Expired-tt34051.html]], -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17007) Add section "How to handle application parameters" in DataStream documentation
Congxian Qiu(klion26) created FLINK-17007: - Summary: Add section "How to handle application parameters" in DataStream documentation Key: FLINK-17007 URL: https://issues.apache.org/jira/browse/FLINK-17007 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Congxian Qiu(klion26) Fix For: 1.11.0 This issue wants to add a section “How to handle application parameters” in the DataStream page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null
[ https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074991#comment-17074991 ] Congxian Qiu(klion26) commented on FLINK-16724: --- [~nppoly] What do you think about my comment, If you agree with it, could you please close this ticket? PS: FLINK-16916 has been fixed, now you can try to use {{NullableSerialzier}} in the new master codebase. > ListSerializer cannot serialize list which containers null > -- > > Key: FLINK-16724 > URL: https://issues.apache.org/jira/browse/FLINK-16724 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Chongchen Chen >Priority: Major > Attachments: list_serializer_err.diff > > > MapSerializer handles null value correctly, but ListSerializer doesn't. The > attachment is the modification of unit test that can reproduce the bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16908) FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while initializing transactional state in 60000ms.
[ https://issues.apache.org/jira/browse/FLINK-16908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074174#comment-17074174 ] Congxian Qiu(klion26) commented on FLINK-16908: --- another instance [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/6996/logs/90] > FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while > initializing transactional state in 6ms. > --- > > Key: FLINK-16908 > URL: https://issues.apache.org/jira/browse/FLINK-16908 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.11.0 >Reporter: Piotr Nowojski >Priority: Major > Labels: test-stability > > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6889=logs=c5f0071e-1851-543e-9a45-9ac140befc32=f66652e3-384e-5b25-be29-abfea69ea8da > {noformat} > [ERROR] > testScaleUpAfterScalingDown(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > Time elapsed: 64.353 s <<< ERROR! > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 6ms. > {noformat} > After this initial error many other tests (I think all following unit tests) > failed with errors like: > {noformat} > [ERROR] > testFailAndRecoverSameCheckpointTwice(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase) > Time elapsed: 7.895 s <<< FAILURE! > java.lang.AssertionError: Detected producer leak. Thread name: > kafka-producer-network-thread | producer-196 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:675) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testFailAndRecoverSameCheckpointTwice(FlinkKafkaProducerITCase.java:311) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16916) The logic of NullableSerializer#copy is wrong
[ https://issues.apache.org/jira/browse/FLINK-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-16916: -- Fix Version/s: 1.10.2 1.9.3 1.8.4 > The logic of NullableSerializer#copy is wrong > - > > Key: FLINK-16916 > URL: https://issues.apache.org/jira/browse/FLINK-16916 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.8.3, 1.9.2, 1.10.0 >Reporter: Congxian Qiu(klion26) >Priority: Blocker > Fix For: 1.8.4, 1.9.3, 1.11.0, 1.10.2 > > > When debugging the problem reported by FLINK-16724, Found that the logic of > {{NullableSerializer#copy}} is wrong. currently, the logic is such as below: > {code:java} > public void copy(DataInputView source, DataOutputView target) throws > IOException { >boolean isNull = source.readBoolean(); >target.writeBoolean(isNull); >if (isNull) { > target.write(padding); >} >else { > originalSerializer.copy(source, target); >} > } > {code} > we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length > is not 0. > We can correct the logic such as below > {code:java} > public void copy(DataInputView source, DataOutputView target) throws > IOException { >boolean isNull = deserializeNull(source); // this will skip the padding > values. >target.writeBoolean(isNull); >if (isNull) { > target.write(padding); >} >else { > originalSerializer.copy(source, target); >} > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null
[ https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072596#comment-17072596 ] Congxian Qiu(klion26) commented on FLINK-16724: --- [~nppoly] thanks for report this issue, This is not the problem of {{ListSerialzier}}, the reason is that {{LongSerialzier}} does not support serializer null value. if you want to serializer null values, you can use {{NullableSerializer instead of }}{{LongSerializer}}. Currently, you will still find that the test will fail if you replace {{LongSerializer}} with {{NullableSerializer}}because the implementation of {{NullableSerializer}}#copy is not correct, I've created FLINK-16916 to track it. > ListSerializer cannot serialize list which containers null > -- > > Key: FLINK-16724 > URL: https://issues.apache.org/jira/browse/FLINK-16724 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Chongchen Chen >Priority: Major > Attachments: list_serializer_err.diff > > > MapSerializer handles null value correctly, but ListSerializer doesn't. The > attachment is the modification of unit test that can reproduce the bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16916) The logic of NullableSerializer#copy is wrong
[ https://issues.apache.org/jira/browse/FLINK-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072593#comment-17072593 ] Congxian Qiu(klion26) commented on FLINK-16916: --- [~aljoscha] , what do you think about this, please assign this to me if it is reasonable. As this can lead to wrong answer when using this serializer, so mark it as BLOCKer, please downgrade if the priority set wrong. > The logic of NullableSerializer#copy is wrong > - > > Key: FLINK-16916 > URL: https://issues.apache.org/jira/browse/FLINK-16916 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.8.3, 1.9.2, 1.10.0 >Reporter: Congxian Qiu(klion26) >Priority: Blocker > Fix For: 1.11.0 > > > When debugging the problem reported by FLINK-16724, Found that the logic of > {{NullableSerializer#copy}} is wrong. currently, the logic is such as below: > {code:java} > public void copy(DataInputView source, DataOutputView target) throws > IOException { >boolean isNull = source.readBoolean(); >target.writeBoolean(isNull); >if (isNull) { > target.write(padding); >} >else { > originalSerializer.copy(source, target); >} > } > {code} > we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length > is not 0. > We can correct the logic such as below > {code:java} > public void copy(DataInputView source, DataOutputView target) throws > IOException { >boolean isNull = deserializeNull(source); // this will skip the padding > values. >target.writeBoolean(isNull); >if (isNull) { > target.write(padding); >} >else { > originalSerializer.copy(source, target); >} > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16916) The logic of NullableSerializer#copy is wrong
Congxian Qiu(klion26) created FLINK-16916: - Summary: The logic of NullableSerializer#copy is wrong Key: FLINK-16916 URL: https://issues.apache.org/jira/browse/FLINK-16916 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.10.0, 1.9.2, 1.8.3 Reporter: Congxian Qiu(klion26) Fix For: 1.11.0 When debugging the problem reported by FLINK-16724, Found that the logic of {{NullableSerializer#copy}} is wrong. currently, the logic is such as below: {code:java} public void copy(DataInputView source, DataOutputView target) throws IOException { boolean isNull = source.readBoolean(); target.writeBoolean(isNull); if (isNull) { target.write(padding); } else { originalSerializer.copy(source, target); } } {code} we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length is not 0. We can correct the logic such as below {code:java} public void copy(DataInputView source, DataOutputView target) throws IOException { boolean isNull = deserializeNull(source); // this will skip the padding values. target.writeBoolean(isNull); if (isNull) { target.write(padding); } else { originalSerializer.copy(source, target); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16576) State inconsistency on restore with memory state backends
[ https://issues.apache.org/jira/browse/FLINK-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17069428#comment-17069428 ] Congxian Qiu(klion26) commented on FLINK-16576: --- The reason why the restore failed here because of that {{the *mapping of stateId and metaInfo is wrong*}}. The mapping is wrong because we registered some metaInfos that do not belong to current subtask. {code:java} // HeapRestoreOperation#restore createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById); // will register the metainfo readStateHandleStateData( fsDataInputStream, inView, keyGroupsStateHandle.getGroupRangeOffsets(), kvStatesById, restoredMetaInfos.size(), serializationProxy.getReadVersion(), serializationProxy.isUsingKeyGroupCompression()); private void createOrCheckStateForMetaInfo( List restoredMetaInfo, Map kvStatesById) { for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) { final StateSnapshotRestore registeredState; .. if (registeredState == null) { kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); // constructing the mapping between stateId and metaInfo, even if the current statehandle does not belong to the current subtask } } } {code} from the code above we can see, we'll always register the metainfo even if the current state handle does not belong to ourselves(the KeyGroupStateHandle will contain metaInfo, EMPTY_KEYGROUP, empty offsets and the stateHandle data). after the registered the wrong metainfo, then the *{{mapping of stateId and metaInfo becomes wrong(when constructing the mapping, we assume that all the handles belong to the current subtask).}}* {{(RocksDBStateBackend does not construct such mapping, so would not encounter such error).}} {{For the solution here, I want to filter out the stateHandles out when assigning state to subtask in }}{{StateAssignmentOperation}}.{{ }} > State inconsistency on restore with memory state backends > - > > Key: FLINK-16576 > URL: https://issues.apache.org/jira/browse/FLINK-16576 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.9.2, 1.10.0 >Reporter: Nico Kruber >Assignee: Congxian Qiu(klion26) >Priority: Blocker > Fix For: 1.9.3, 1.10.1, 1.11.0 > > > I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} > example in Flink. Restore would fail with either of these causes, but only > for the memory state backends and only with some combinations of parallelism > I took the savepoint with and parallelism I restore the job with: > {code:java} > java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, > endKeyGroup=95} does not contain key group 97 {code} > or > {code:java} > java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) > {code} > or > {code:java} > java.io.IOException: Corrupt stream, found tag: 8 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) > {code} > > I managed to make it reproducible in a test that I quickly hacked together in > [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java] > (please checkout the whole repository since I had to change some > dependencies). > In a bit more detail, this is what I discovered before, also with a manual > savepoint on S3: > Savepoint that was taken with parallelism 2 (p=2) and shows the restore > failure in three different ways (all running in Flink 1.10.0; but I also see > it in Flink 1.9): > * first of all, if I try to restore with p=2, everything is fine > * if I restore with p=4 I get an exception like the one mentioned above: > {code:java} > 2020-03-11 15:53:35,149 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- > Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, > PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) > (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > at >
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034562#comment-17034562 ] Congxian Qiu(klion26) commented on FLINK-13632: --- on {{ScalaOptionSerializerSnapshotMigrationTest}}. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027476#comment-17027476 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm on {{NullableSerializerMigrationTest}}, and {{VoidNamespacieSerializerSnapshotMigrationTest}}. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17026454#comment-17026454 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm on {{BaseTypeSerializerSnapshotMigrationTest}}. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020738#comment-17020738 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm on {{StreamElementSerializerMigrationTest}} now. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020734#comment-17020734 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm on {{TimerSerializerSnapshotMigrationTest}} now. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15699) FirstValueAggFunctionWithoutOrderTest failed on Traivs
Congxian Qiu(klion26) created FLINK-15699: - Summary: FirstValueAggFunctionWithoutOrderTest failed on Traivs Key: FLINK-15699 URL: https://issues.apache.org/jira/browse/FLINK-15699 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.11.0 Reporter: Congxian Qiu(klion26) 09:48:53.473 [ERROR] COMPILATION ERROR : 09:48:53.473 [INFO] - 09:48:53.473 [ERROR] /home/travis/build/flink-ci/flink/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueAggFunctionWithoutOrderTest.java:[78,37] incompatible types: inference variable T has incompatible bounds equality constraints: java.lang.Byte,T,T,T,T,T,java.lang.Boolean,T lower bounds: java.lang.Boolean 09:48:53.473 [INFO] 1 error [https://travis-ci.com/flink-ci/flink/jobs/277466696] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15699) FirstValueAggFunctionWithoutOrderTest failed on Traivs
[ https://issues.apache.org/jira/browse/FLINK-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019799#comment-17019799 ] Congxian Qiu(klion26) commented on FLINK-15699: --- cc [~godfreyhe] > FirstValueAggFunctionWithoutOrderTest failed on Traivs > -- > > Key: FLINK-15699 > URL: https://issues.apache.org/jira/browse/FLINK-15699 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.11.0 >Reporter: Congxian Qiu(klion26) >Priority: Major > > 09:48:53.473 [ERROR] COMPILATION ERROR : > 09:48:53.473 [INFO] > - > 09:48:53.473 [ERROR] > /home/travis/build/flink-ci/flink/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueAggFunctionWithoutOrderTest.java:[78,37] > incompatible types: inference variable T has incompatible bounds > equality constraints: java.lang.Byte,T,T,T,T,T,java.lang.Boolean,T > lower bounds: java.lang.Boolean > 09:48:53.473 [INFO] 1 error > [https://travis-ci.com/flink-ci/flink/jobs/277466696] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019532#comment-17019532 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm porting {{PrimitiveArraySerializerSnapshotMigrationTest}}. > Port serializer upgrade tests to TypeSerializerUpgradeTestBase > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Test > Components: API / Type Serialization System, Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers
[ https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019463#comment-17019463 ] Congxian Qiu(klion26) commented on FLINK-15637: --- [~gjy] Yes, I'm working on this, please assign it to me. > For RocksDBStateBackend, make RocksDB the default store for timers > -- > > Key: FLINK-15637 > URL: https://issues.apache.org/jira/browse/FLINK-15637 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by > default. Also ensure that the programmatic default value becomes the same. > We need to update the performance tuning guide to mention this. > > We need to update the release notes to mention this. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
[ https://issues.apache.org/jira/browse/FLINK-15685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-15685: -- Affects Version/s: 1.11.0 > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs > --- > > Key: FLINK-15685 > URL: https://issues.apache.org/jira/browse/FLINK-15685 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Tests >Affects Versions: 1.11.0 >Reporter: Congxian Qiu(klion26) >Priority: Major > > > 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time > elapsed: 76.395 s <<< FAILURE! - in > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase > > 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 > kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) > Time elapsed: 23.806 s <<< ERROR! > > 18973java.io.IOException: > > 18974Process execution failed due error. Error output:Exception in thread > "main" org.apache.flink.table.client.SqlClientException: Unexpected > exception. This is a bug. Please consider filing an issue. > > 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > > 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > 18977 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) > > 18978 at > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) > > 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) > > 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > > 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader > > 18982 at > org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) > > 18983 at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285) > > 18984 at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163) > > 18985 at > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) > > 18986 at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371) > > 18987 at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552) > > 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > > 18989 at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550) > > 18990 at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487) > > 18991 at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > > 18992 at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > > 18993 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748) > > 18994 ... 3 more > > 18995Caused by: java.lang.ClassNotFoundException: > org.apache.avro.io.DatumReader > > 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > 18999 ... 15 more > > 19000 > > 19001 at > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) > > 19002 at > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) > > 19003 > > 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 > kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) > Time elapsed: 25.227 s <<< ERROR! > > 19005java.io.IOException: > > 19006Process execution failed due error. Error output:Exception in thread > "main" org.apache.flink.table.client.SqlClientException: Unexpected > exception. This is a bug. Please consider filing an issue. > > 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > > 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > 19009 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) > > 19010 at >
[jira] [Commented] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
[ https://issues.apache.org/jira/browse/FLINK-15685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019365#comment-17019365 ] Congxian Qiu(klion26) commented on FLINK-15685: --- >From the exception log, it was thrown in {{SqlClinet.java}}, so I attached >this issue to {{Table SQL/Cline}} component. > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs > --- > > Key: FLINK-15685 > URL: https://issues.apache.org/jira/browse/FLINK-15685 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Tests >Reporter: Congxian Qiu(klion26) >Priority: Major > > > 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time > elapsed: 76.395 s <<< FAILURE! - in > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase > > 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 > kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) > Time elapsed: 23.806 s <<< ERROR! > > 18973java.io.IOException: > > 18974Process execution failed due error. Error output:Exception in thread > "main" org.apache.flink.table.client.SqlClientException: Unexpected > exception. This is a bug. Please consider filing an issue. > > 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > > 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > 18977 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) > > 18978 at > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) > > 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) > > 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > > 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader > > 18982 at > org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) > > 18983 at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285) > > 18984 at > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163) > > 18985 at > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) > > 18986 at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371) > > 18987 at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552) > > 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > > 18989 at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550) > > 18990 at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487) > > 18991 at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) > > 18992 at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) > > 18993 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748) > > 18994 ... 3 more > > 18995Caused by: java.lang.ClassNotFoundException: > org.apache.avro.io.DatumReader > > 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > 18999 ... 15 more > > 19000 > > 19001 at > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) > > 19002 at > org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) > > 19003 > > 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 > kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) > Time elapsed: 25.227 s <<< ERROR! > > 19005java.io.IOException: > > 19006Process execution failed due error. Error output:Exception in thread > "main" org.apache.flink.table.client.SqlClientException: Unexpected > exception. This is a bug. Please consider filing an issue. > > 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > > 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > 19009 at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) > >
[jira] [Created] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
Congxian Qiu(klion26) created FLINK-15685: - Summary: org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs Key: FLINK-15685 URL: https://issues.apache.org/jira/browse/FLINK-15685 Project: Flink Issue Type: Bug Components: Table SQL / Client, Tests Reporter: Congxian Qiu(klion26) 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 76.395 s <<< FAILURE! - in org.apache.flink.tests.util.kafka.SQLClientKafkaITCase 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 23.806 s <<< ERROR! 18973java.io.IOException: 18974Process execution failed due error. Error output:Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. 18977 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) 18978 at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader 18982 at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) 18983 at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285) 18984 at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163) 18985 at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) 18986 at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371) 18987 at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552) 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) 18989 at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550) 18990 at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487) 18991 at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) 18992 at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) 18993 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748) 18994 ... 3 more 18995Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumReader 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 18999 ... 15 more 19000 19001 at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) 19002 at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) 19003 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 25.227 s <<< ERROR! 19005java.io.IOException: 19006Process execution failed due error. Error output:Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. 19009 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) 19010 at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) 19011 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) 19012 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 19013Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader 19014 at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) 19015 at
[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException
[ https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019341#comment-17019341 ] Congxian Qiu(klion26) commented on FLINK-15247: --- Seems another instance [https://travis-ci.org/klion26/flink/jobs/639304855?utm_medium=notification_source=github_status] {code:java} 07:02:05.922 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.328 s <<< FAILURE! - in org.apache.flink.test.streaming.runtime.BackPressureITCase3910 07:02:05.922 [ERROR] operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase) Time elapsed: 7.319 s <<< ERROR!3911org.apache.flink.util.FlinkException: Could not close resource. 3912at org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:166) 3913 Caused by: org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor down. 3914 Caused by: org.apache.flink.util.FlinkException: Could not properly shut down the TaskManager services. 3915 Caused by: java.util.ConcurrentModificationException {code} > Closing (Testing)MiniCluster may cause ConcurrentModificationException > -- > > Key: FLINK-15247 > URL: https://issues.apache.org/jira/browse/FLINK-15247 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {noformat} > Test > operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase) > failed with: > org.apache.flink.util.FlinkException: Could not close resource. > at > org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158) > at
[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers
[ https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194 ] Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 4:07 AM: Ran the benchmark of {{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in flink-benchmarks repo}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. We may need to add a release note to let users know about this. was (Author: klion26): Ran the benchmark of {{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in flink-benchmarks repo}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. We may need to add a release note to let users know about this. > For RocksDBStateBackend, make RocksDB the default store for timers > -- > > Key: FLINK-15637 > URL: https://issues.apache.org/jira/browse/FLINK-15637 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by > default. Also ensure that the programmatic default value becomes the same. > We need to update the performance tuning guide to mention this. > > We need to update the release notes to mention this. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers
[ https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194 ] Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 4:07 AM: Ran the benchmark of {{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in flink-benchmarks repo}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. We may need to add a release note to let users know about this. was (Author: klion26): Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. We may need to add a release note to let users know about this. > For RocksDBStateBackend, make RocksDB the default store for timers > -- > > Key: FLINK-15637 > URL: https://issues.apache.org/jira/browse/FLINK-15637 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by > default. Also ensure that the programmatic default value becomes the same. > We need to update the performance tuning guide to mention this. > > We need to update the release notes to mention this. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers
[ https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194 ] Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 3:00 AM: Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. We may need to add a release note to let users know about this. was (Author: klion26): Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. > For RocksDBStateBackend, make RocksDB the default store for timers > -- > > Key: FLINK-15637 > URL: https://issues.apache.org/jira/browse/FLINK-15637 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by > default. Also ensure that the programmatic default value becomes the same. > We need to update the performance tuning guide to mention this. > > We need to update the release notes to mention this. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers
[ https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194 ] Congxian Qiu(klion26) commented on FLINK-15637: --- Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression after applying this change. for no incremental mode, there is 4.6% regression, and 2.9% regression for incremental mode. * Result before applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 203.893 ± 1.580 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 201.896 ± 5.179 ops/ms {code} * Result after applying the change {code:java} Benchmark (stateBackend) Mode Cnt Score Error Units RocksStateBackendBenchmark.stateBackends ROCKS thrpt 30 194.382 ± 2.256 ops/ms RocksStateBackendBenchmark.stateBackends ROCKS_INC thrpt 30 195.912 ± 2.151 ops/ms {code} Steps for generating the result # checkout the commit before applying the change and install # run the benchmark to get the result # checkout the commit after applying the change and install # run the benchmark to get the result. > For RocksDBStateBackend, make RocksDB the default store for timers > -- > > Key: FLINK-15637 > URL: https://issues.apache.org/jira/browse/FLINK-15637 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by > default. Also ensure that the programmatic default value becomes the same. > We need to update the performance tuning guide to mention this. > > We need to update the release notes to mention this. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15661) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed because of Could not find Flink job
Congxian Qiu(klion26) created FLINK-15661: - Summary: JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed because of Could not find Flink job Key: FLINK-15661 URL: https://issues.apache.org/jira/browse/FLINK-15661 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.11.0 Reporter: Congxian Qiu(klion26) 2020-01-19T06:25:02.3856954Z [ERROR] JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:347 The program encountered a ExecutionException : org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (47fe3e8df0e59994938485f683d1410e) 2020-01-19T06:25:02.3857171Z at org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:91) 2020-01-19T06:25:02.3857571Z at org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:82) 2020-01-19T06:25:02.3857866Z at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 2020-01-19T06:25:02.3857982Z at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) 2020-01-19T06:25:02.3859852Z at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 2020-01-19T06:25:02.3860440Z at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 2020-01-19T06:25:02.3860732Z at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872) 2020-01-19T06:25:02.3860960Z at akka.dispatch.OnComplete.internal(Future.scala:263) 2020-01-19T06:25:02.3861099Z at akka.dispatch.OnComplete.internal(Future.scala:261) 2020-01-19T06:25:02.3861232Z at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 2020-01-19T06:25:02.3861391Z at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 2020-01-19T06:25:02.3861546Z at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 2020-01-19T06:25:02.3861712Z at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) 2020-01-19T06:25:02.3861809Z at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 2020-01-19T06:25:02.3861916Z at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 2020-01-19T06:25:02.3862221Z at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) 2020-01-19T06:25:02.3862475Z at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) 2020-01-19T06:25:02.3862626Z at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) 2020-01-19T06:25:02.3862736Z at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 2020-01-19T06:25:02.3862820Z at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 2020-01-19T06:25:02.3867146Z at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 2020-01-19T06:25:02.3867318Z at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 2020-01-19T06:25:02.3867441Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867552Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867664Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867763Z at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 2020-01-19T06:25:02.3867843Z at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 2020-01-19T06:25:02.3867936Z at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 2020-01-19T06:25:02.3868036Z at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) 2020-01-19T06:25:02.3868145Z at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 2020-01-19T06:25:02.3868223Z at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 2020-01-19T06:25:02.3868313Z at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 2020-01-19T06:25:02.3868390Z at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-01-19T06:25:02.3868520Z Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (47fe3e8df0e59994938485f683d1410e) 2020-01-19T06:25:02.3868625Z at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$17(Dispatcher.java:516)
[jira] [Commented] (FLINK-13632) Update serializer upgrade tests to restore from 1.8 and beyond
[ https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018075#comment-17018075 ] Congxian Qiu(klion26) commented on FLINK-13632: --- I'm currently porting {{ValueSerializerMigrationTest}} > Update serializer upgrade tests to restore from 1.8 and beyond > -- > > Key: FLINK-13632 > URL: https://issues.apache.org/jira/browse/FLINK-13632 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Blocker > Fix For: 1.10.0 > > > FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) > for writing serializer upgrade tests. Now we need to migrate all existing > tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test > base and update them to restore from Flink 1.8 onward. > It seems these are the subclasses: > * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl) > * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * PrimitiveArraySerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base.array) > * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils) > * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * BufferEntrySerializerMigrationTest > (org.apache.flink.streaming.api.operators.co) > * TimerSerializerSnapshotMigrationTest > (org.apache.flink.streaming.api.operators) > * StreamElementSerializerMigrationTest > (org.apache.flink.streaming.runtime.streamrecord) > * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo) > * BaseTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * NullableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * VoidNamespacieSerializerSnapshotMigrationTest > (org.apache.flink.runtime.state) > * ScalaOptionSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * ScalaTrySerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state) > * LockableTypeSerializerSnapshotMigrationTest > (org.apache.flink.cep.nfa.sharedbuffer) > * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep) > * WindowSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.runtime.operators.windowing) > * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream) > * TwoPhaseCommitSinkStateSerializerMigrationTest > (org.apache.flink.streaming.api.functions.sink) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * KafkaSerializerSnapshotsMigrationTest > (org.apache.flink.streaming.connectors.kafka) > * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime) > * ValueArraySerializerSnapshotMigrationTest > (org.apache.flink.graph.types.valuearray) > * MapSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * CompositeTypeSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils) > * ListSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * EnumSerializerSnapshotMigrationTest > (org.apache.flink.api.common.typeutils.base) > * PojoSerializerSnapshotMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state) > * CopyableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * WritableSerializerMigrationTest > (org.apache.flink.api.java.typeutils.runtime) > * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview) > * ScalaEitherSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * LongValueWithProperHashCodeSerializerSnapshotMigrationTest > (org.apache.flink.graph.drivers.transform) > * ScalaCaseClassSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * TraversableSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) > * EnumValueSerializerSnapshotMigrationTest > (org.apache.flink.api.scala.typeutils) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15619) GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount failed on Azure
Congxian Qiu(klion26) created FLINK-15619: - Summary: GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount failed on Azure Key: FLINK-15619 URL: https://issues.apache.org/jira/browse/FLINK-15619 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) 01-16T08:32:11.0214825Z [ERROR] testAllProcessingTimeTumblingGroupWindowOverCount[StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase) Time elapsed: 2.213 s <<< ERROR! 2020-01-16T08:32:11.0223298Z org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2020-01-16T08:32:11.0241857Z at org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount(GroupWindowTableAggregateITCase.scala:130) 2020-01-16T08:32:11.0261909Z Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=0) 2020-01-16T08:32:11.0274347Z Caused by: java.lang.Exception: Artificial Failure 2020-01-16T08:32:11.0291664Z [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/4391/logs/16] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15603) Show "barrier lag" in checkpoint statistics
[ https://issues.apache.org/jira/browse/FLINK-15603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016484#comment-17016484 ] Congxian Qiu(klion26) commented on FLINK-15603: --- And we may add some debug log in {{CheckpointBarrierAligner}} when received a barrier and when barrier align complete such as {{CheckpointBarrierTracker}}, this can be helpful when debugging checkpoint problem. > Show "barrier lag" in checkpoint statistics > --- > > Key: FLINK-15603 > URL: https://issues.apache.org/jira/browse/FLINK-15603 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Stephan Ewen >Priority: Critical > Labels: usability > Fix For: 1.11.0 > > > One of the most important metrics is missing in the checkpoint stats: > "barrier lag", meaning the time it between when the checkpoint was triggered > and when the barriers arrive at a task. > That time is critical to identify if a checkpoint takes too long because of > backpressure or other contention. > You can implicitly calculate this by "end_to_end_time - sync_time - > async_time", but it is much more obvious for users that something is up when > this number is explicitly shown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012786#comment-17012786 ] Congxian Qiu(klion26) commented on FLINK-12785: --- Already added the release not, closing this issue. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) closed FLINK-12785. - Resolution: Fixed > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-12785: -- Release Note: Before FLINK-12785, user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed. was: Before FLINK-12785, user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed. User can use `state.backend.rocksdb.write-batch-size` to change the size of WriteBatch if you needed. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-12785: -- Release Note: Before FLINK-12785, user may encounter OOM if there are huge KV pairs when restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's WriteBatch will flush if the consumed memory exceeds it. User could tune the limit through the state.backend.rocksdb.write-batch-size property in flink-conf.yaml if needed. User can use `state.backend.rocksdb.write-batch-size` to change the size of WriteBatch if you needed. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15529) Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0
[ https://issues.apache.org/jira/browse/FLINK-15529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012748#comment-17012748 ] Congxian Qiu(klion26) commented on FLINK-15529: --- Currently. I've tested using Pojo serializer manually, with such scenarios, all are ok. * RocksDBStateBackend ** use original pojo class ** add/delete some fields in pojo class ** reorder fields in pojo class ** change field type * HeapStateBackend ** use original pojo class ** add/delete some fields in pojo class ** reorder fields in pojo class ** change field type > Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0 > - > > Key: FLINK-15529 > URL: https://issues.apache.org/jira/browse/FLINK-15529 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 1.10.0 >Reporter: Yu Li >Assignee: Yu Li >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15424) Make all AppendingState#add respect the java doc
[ https://issues.apache.org/jira/browse/FLINK-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17009446#comment-17009446 ] Congxian Qiu(klion26) commented on FLINK-15424: --- [~aljoscha] thanks for the reply. {{AppendingState}} can be {{ListState}} or {{ValueState}}. I did some search for whether should collection allow null element, and found that * java.uitl.List allows null if the implementation allows null. {{ArrayList and }}{{LinkedList}} allow adding null element * {{ListState#add}} do not allow add null element, will throw an exception, {{ListState}} can be created through {{RuntimeContext}} by user. * Guava collection recommend avoiding to use null in collections[1] Summary of all the things, I'd like to propose, change the java doc to "do not allow null value", and make all the implementation of {{AppendingStat}}e respect this doc(throw Exception if add a null element), what do you think? [1][https://github.com/google/guava/wiki/UsingAndAvoidingNullExplained] > Make all AppendingState#add respect the java doc > > > Key: FLINK-15424 > URL: https://issues.apache.org/jira/browse/FLINK-15424 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.3, 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Currently, We have a java doc in > {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}} > {code:java} > If null is passed in, the state value will remain unchanged.{code} > but currently, the implementation did not respect this, take > {{HeapReducingState}} as an example, we'll clear the state if the passed > parameter is null > {code:java} > @Override > public void add(V value) throws IOException { > if (value == null) { > clear(); > return; > } > try { > stateTable.transform(currentNamespace, value, reduceTransformation); > } catch (Exception e) { > throw new IOException("Exception while applying ReduceFunction in > reducing state", e); > } > } > {code} > But in {{RocksDBReducingState}} we would not clear the state, and put the > null value into state if serializer can serialize null. > {code:java} > @Override > public void add(V value) throws Exception { >byte[] key = getKeyBytes(); >V oldValue = getInternal(key); >V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, > value); >updateInternal(key, newValue); > } > {code} > this issue wants to make all {{Appending}}State respect the javadoc of > {{AppendingState}}, and return directly if the passed in parameter is null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007378#comment-17007378 ] Congxian Qiu(klion26) commented on FLINK-15152: --- [~pnowojski] thanks for the confirmation, I can help to fix this, please assign it to me. > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction>() { > @Override > public Tuple2 map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > } > }).keyBy(0).flatMap(new CountWindowAverage()).print(); > env.execute("Flink Streaming Java API Skeleton"); > } > public static class CountWindowAverage extends > RichFlatMapFunction, Tuple2> { > private transient ValueState> sum; > @Override > public void flatMap(Tuple2 input, Collector Long>> out) throws Exception { > Tuple2 currentSum = sum.value(); > currentSum.f0 += 1; > currentSum.f1 += input.f1; > sum.update(currentSum); >
[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006650#comment-17006650 ] Congxian Qiu(klion26) commented on FLINK-15152: --- [~pnowojski] thanks for your reply. Yes, duplicate a logic from {{org.apache.flink.runtime.scheduler.SchedulerBase#triggerSavepoint}}. I think we should {{restart}} the {{CheckpointCoordinator}} in two cases below: # {{CheckpointCoordinator#triggerSynchronousSavepoint}} failed # stop job failed(even if the synchronous savepoint succeed) For the new issues/extra complexity that might be introduced. IMO, we should make the {{CheckpointCoordinator}} running if the job is not stopped. so I think {{restart}} CheckpointCoordinator is needed. So I proposed the previous change. > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction>() { > @Override > public Tuple2 map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > }
[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException
[ https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006575#comment-17006575 ] Congxian Qiu(klion26) commented on FLINK-15247: --- Seems another instance [https://travis-ci.com/flink-ci/flink/jobs/271415113] > Closing (Testing)MiniCluster may cause ConcurrentModificationException > -- > > Key: FLINK-15247 > URL: https://issues.apache.org/jira/browse/FLINK-15247 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {noformat} > Test > operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase) > failed with: > org.apache.flink.util.FlinkException: Could not close resource. > at > org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > Caused by: org.apache.flink.util.FlinkException: Error while shutting the > TaskExecutor down. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at >
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006540#comment-17006540 ] Congxian Qiu(klion26) commented on FLINK-15447: --- FYI, the Jaas files will locate in {{/tmp}} directory, if you have too many jobs run in the same machine, maybe you'll encounter this exception, the jaas file has been moved to {{WORKING_DIR}} in FLINK-14433 > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-15451: -- Summary: TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on azure (was: TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure) > TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure > failed on azure > -- > > Key: FLINK-15451 > URL: https://issues.apache.org/jira/browse/FLINK-15451 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in > org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase > 2019-12-31T02:43:39.4768373Z [ERROR] > testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase) > Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z > java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z > > > [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure
Congxian Qiu(klion26) created FLINK-15451: - Summary: TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure Key: FLINK-15451 URL: https://issues.apache.org/jira/browse/FLINK-15451 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.9.1 Reporter: Congxian Qiu(klion26) 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase 2019-12-31T02:43:39.4768373Z [ERROR] testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase) Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException
[ https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005943#comment-17005943 ] Congxian Qiu(klion26) commented on FLINK-15247: --- another instance [https://travis-ci.com/flink-ci/flink/jobs/271335452] > Closing (Testing)MiniCluster may cause ConcurrentModificationException > -- > > Key: FLINK-15247 > URL: https://issues.apache.org/jira/browse/FLINK-15247 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Gary Yao >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {noformat} > Test > operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase) > failed with: > org.apache.flink.util.FlinkException: Could not close resource. > at > org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > Caused by: org.apache.flink.util.FlinkException: Error while shutting the > TaskExecutor down. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at >
[jira] [Commented] (FLINK-15427) State TTL RocksDb backend end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004461#comment-17004461 ] Congxian Qiu(klion26) commented on FLINK-15427: --- This test itself passed, but failed when {{checking exceptions}} {code:java} Checking for errors...^M Found error in log files:^M {code} after executing the command to find the exception we'll get {code:java} 2019-12-27 05:18:34,743 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Received CancelTaskException while we are not canceled. This is a bug and should be reported^M org.apache.flink.runtime.execution.CancelTaskException: Consumed partition PipelinedSubpartitionView(index: 2) of ResultPartition 745fd76b3c0327b1b0732bb14045de1c@2e06db5ab07dfc5dabc32576a9a40a0f has been released.^M at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190)^M at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509)^M at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487)^M at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475)^M at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75)^M at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125)^M at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)^M at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)^M at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)^M at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)^M at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)^M at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)^M at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)^M {code} This {{CancelException}} was introduced by [~AHeise] in FLINK-15317, and I think FLINK-15403 has already been tracking it. For this issue, it wants to verify state ttl, if the verify failed, it will print something in the stdout. we used the {{check exceptions(which will check whether the `.out` file is empty or not)}} to do this thing. So, I propose to check in the {{test_stream_state__ttl}}.sh other than {{delegating to the exceptions check}}, the reason is something like FLINK-15105. we'll add some checking logic in the end of {{test_stream_state_ttl.sh}} such as below and skip exception check for this test. [~azagrebin] {code:java} if grep "TtlVerificationContext{" $FLINK_DIR/log/*.out > /dev/null; then exit 1; # contains the output fi {code} > State TTL RocksDb backend end-to-end test stalls on travis > -- > > Key: FLINK-15427 > URL: https://issues.apache.org/jira/browse/FLINK-15427 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0 >Reporter: Yu Li >Assignee: Congxian Qiu(klion26) >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The 'State TTL RocksDb backend end-to-end test' case stalls and finally > timedout with error message: > {noformat} > The job exceeded the maximum log length, and has been terminated. > {noformat} > https://api.travis-ci.org/v3/job/629699416/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15427) State TTL RocksDb backend end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004128#comment-17004128 ] Congxian Qiu(klion26) commented on FLINK-15427: --- I'll take a look at this issue > State TTL RocksDb backend end-to-end test stalls on travis > -- > > Key: FLINK-15427 > URL: https://issues.apache.org/jira/browse/FLINK-15427 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > The 'State TTL RocksDb backend end-to-end test' case stalls and finally > timedout with error message: > {noformat} > The job exceeded the maximum log length, and has been terminated. > {noformat} > https://api.travis-ci.org/v3/job/629699416/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15424) Make all AppendingState#add respect the java doc
[ https://issues.apache.org/jira/browse/FLINK-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-15424: -- Component/s: Runtime / State Backends > Make all AppendingState#add respect the java doc > > > Key: FLINK-15424 > URL: https://issues.apache.org/jira/browse/FLINK-15424 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.3, 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Currently, We have a java doc in > {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}} > {code:java} > If null is passed in, the state value will remain unchanged.{code} > but currently, the implementation did not respect this, take > {{HeapReducingState}} as an example, we'll clear the state if the passed > parameter is null > {code:java} > @Override > public void add(V value) throws IOException { > if (value == null) { > clear(); > return; > } > try { > stateTable.transform(currentNamespace, value, reduceTransformation); > } catch (Exception e) { > throw new IOException("Exception while applying ReduceFunction in > reducing state", e); > } > } > {code} > But in {{RocksDBReducingState}} we would not clear the state, and put the > null value into state if serializer can serialize null. > {code:java} > @Override > public void add(V value) throws Exception { >byte[] key = getKeyBytes(); >V oldValue = getInternal(key); >V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, > value); >updateInternal(key, newValue); > } > {code} > this issue wants to make all {{Appending}}State respect the javadoc of > {{AppendingState}}, and return directly if the passed in parameter is null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15424) Make all AppendingState#add respect the java doc
Congxian Qiu(klion26) created FLINK-15424: - Summary: Make all AppendingState#add respect the java doc Key: FLINK-15424 URL: https://issues.apache.org/jira/browse/FLINK-15424 Project: Flink Issue Type: Bug Affects Versions: 1.9.1, 1.8.3 Reporter: Congxian Qiu(klion26) Currently, We have a java doc in {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}} {code:java} If null is passed in, the state value will remain unchanged.{code} but currently, the implementation did not respect this, take {{HeapReducingState}} as an example, we'll clear the state if the passed parameter is null {code:java} @Override public void add(V value) throws IOException { if (value == null) { clear(); return; } try { stateTable.transform(currentNamespace, value, reduceTransformation); } catch (Exception e) { throw new IOException("Exception while applying ReduceFunction in reducing state", e); } } {code} But in {{RocksDBReducingState}} we would not clear the state, and put the null value into state if serializer can serialize null. {code:java} @Override public void add(V value) throws Exception { byte[] key = getKeyBytes(); V oldValue = getInternal(key); V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value); updateInternal(key, newValue); } {code} this issue wants to make all {{Appending}}State respect the javadoc of {{AppendingState}}, and return directly if the passed in parameter is null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15403) 'State Migration end-to-end test from 1.6' is unstable on travis.
[ https://issues.apache.org/jira/browse/FLINK-15403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003840#comment-17003840 ] Congxian Qiu(klion26) commented on FLINK-15403: --- The exception in the description is an error log, I'm not sure whether the test failed by it (maybe by excpetionCheck?) This log was introduced by [~AHeise] in FLINK-15317, maybe he can share more about it. > 'State Migration end-to-end test from 1.6' is unstable on travis. > - > > Key: FLINK-15403 > URL: https://issues.apache.org/jira/browse/FLINK-15403 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.10.0 >Reporter: Xintong Song >Priority: Critical > Labels: testability > Fix For: 1.10.0 > > > https://api.travis-ci.org/v3/job/629576631/log.txt > The test case fails because the log contains the following error message. > {code} > 2019-12-26 09:19:35,537 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Received > CancelTaskException while we are not canceled. This is a bug and should be > reported > org.apache.flink.runtime.execution.CancelTaskException: Consumed partition > PipelinedSubpartitionView(index: 0) of ResultPartition > 3886657fb8cc980139fac67e32d6e380@8cfcbe851fe3bb3fa00e9afc370bd963 has been > released. > at > org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475) > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap
[ https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003837#comment-17003837 ] Congxian Qiu(klion26) commented on FLINK-15406: --- Hi, [~lintingbin] From the description, seems it throw exception when snapshotting instead of restoring. and the root cause is {{keySerializer}} used to constructing {{InternalTimersSnapshot}} is NULL, maybe this operator did not call InternalTimerServiceIMple#{{startTimerSerivice}} ? Could you please share how do you reproduce this problem? (a minimal demo can reproduce is better) thanks. > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap > > > Key: FLINK-15406 > URL: https://issues.apache.org/jira/browse/FLINK-15406 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.9.1 >Reporter: Darcy Lin >Priority: Major > > The savepoint is writted by "State Processor API" can't be restore by map or > flatmap. But it can be retored by KeyedProcessFunction. > Following is the error message: > {code:java} > java.lang.Exception: Could not write timer service of Flat Map -> Map -> > Sink: device_first_user_create (1/8) to checkpoint state > stream.java.lang.Exception: Could not write timer service of Flat Map -> Map > -> Sink: device_first_user_create (1/8) to checkpoint state stream. at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.NullPointerException at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at > org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) > ... 19 more{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003427#comment-17003427 ] Congxian Qiu(klion26) edited comment on FLINK-15152 at 12/26/19 2:27 AM: - After having a look at the code, I think we should re-start {{checkpointCoordinator}} if {{triggerSynchronousSavepoint}} or {{terminateJob}} failed. we can add a handler to re-start {{checheckpointCoordinator}} such as the below, what do you think? [~pnowojski] [~kkloudas] CC [~zhuzh] {code:java} // end of SchedulerBase#stopWithSavepoint savepointFuture.thenCompose((path) -> terminationFuture.thenApply(jobStatus -> path)) .handle( (path, throwable) -> { if (throwable != null) { //re-start the checkpoint coordinator when triggerSynchronousSavepoint or terminateJob failed. checkpointCoordinator.startCheckpointScheduler(); throw new CompletionException(ExceptionUtils.stripException(throwable, CompletionException.class)); } return path; });{code} [1] [SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859] was (Author: klion26): After having a look at the code, I think we should re-start checkpointCoordinator if {{triggerSynchronousSavepoint}} or {{terminateJob}} failed. we can add a handler to re-start {{checheckpointCoordinator}} such as the below, what do you think? [~pnowojski] [~kkloudas] CC [~zhuzh] {code:java} // end of SchedulerBase#stopWithSavepoint savepointFuture.thenCompose((path) -> terminationFuture.thenApply(jobStatus -> path)) .handle( (path, throwable) -> { if (throwable != null) { throw new CompletionException(ExceptionUtils.stripException(throwable, CompletionException.class)); } return path; });{code} [1] [SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859] > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: >
[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003427#comment-17003427 ] Congxian Qiu(klion26) commented on FLINK-15152: --- After having a look at the code, I think we should re-start checkpointCoordinator if {{triggerSynchronousSavepoint}} or {{terminateJob}} failed. we can add a handler to re-start {{checheckpointCoordinator}} such as the below, what do you think? [~pnowojski] [~kkloudas] CC [~zhuzh] {code:java} // end of SchedulerBase#stopWithSavepoint savepointFuture.thenCompose((path) -> terminationFuture.thenApply(jobStatus -> path)) .handle( (path, throwable) -> { if (throwable != null) { throw new CompletionException(ExceptionUtils.stripException(throwable, CompletionException.class)); } return path; });{code} [1] [SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859] > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text =
[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003193#comment-17003193 ] Congxian Qiu(klion26) commented on FLINK-15152: --- Thanks for reporting the issue, from the description, seems it is a bug, I'll take a look at it. > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction>() { > @Override > public Tuple2 map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > } > }).keyBy(0).flatMap(new CountWindowAverage()).print(); > env.execute("Flink Streaming Java API Skeleton"); > } > public static class CountWindowAverage extends > RichFlatMapFunction, Tuple2> { > private transient ValueState> sum; > @Override > public void flatMap(Tuple2 input, Collector Long>> out) throws Exception { > Tuple2 currentSum = sum.value(); > currentSum.f0 += 1; > currentSum.f1 += input.f1; > sum.update(currentSum); >
[jira] [Commented] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999881#comment-16999881 ] Congxian Qiu(klion26) commented on FLINK-13553: --- Currently, can't reproduce it and not appear in the last month, close it now. please reopen it if it appears in the future. > KvStateServerHandlerTest.readInboundBlocking unstable on Travis > --- > > Key: FLINK-13553 > URL: https://issues.apache.org/jira/browse/FLINK-13553 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{KvStateServerHandlerTest.readInboundBlocking}} and > {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a > {{TimeoutException}}. > https://api.travis-ci.org/v3/job/566420641/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) closed FLINK-13553. - Resolution: Cannot Reproduce > KvStateServerHandlerTest.readInboundBlocking unstable on Travis > --- > > Key: FLINK-13553 > URL: https://issues.apache.org/jira/browse/FLINK-13553 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Assignee: Congxian Qiu(klion26) >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{KvStateServerHandlerTest.readInboundBlocking}} and > {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a > {{TimeoutException}}. > https://api.travis-ci.org/v3/job/566420641/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15280) Checkpoint end-to-end test failed
[ https://issues.apache.org/jira/browse/FLINK-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997279#comment-16997279 ] Congxian Qiu(klion26) commented on FLINK-15280: --- FYI FLINK-15105 wants to disable the exception check logic. If the problem here is the exception check, then I think it can benefit after FLINK-15105 resolved > Checkpoint end-to-end test failed > - > > Key: FLINK-15280 > URL: https://issues.apache.org/jira/browse/FLINK-15280 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: wangxiyuan >Priority: Major > > The Checkpoint e2e test failed. The error log is very long. Please check the > link to find the error message. > Some logs in these two days: > [https://api.travis-ci.org/v3/job/624983426/log.txt] > Running 'Resuming Externalized Checkpoint (rocks, incremental, no parallelism > change) end-to-end test' > > [https://api.travis-ci.org/v3/job/625281873/log.txt] > Running 'Resuming Externalized Checkpoint (file, sync, scale down) end-to-end > test' > > And arm as well: > [https://logs.openlabtesting.org/logs/periodic-20-flink-mail/github.com/apache/flink/master/flink-end-to-end-test-arm64-checkpoints-and-sticky/3da575a/job-output.txt.gz] > Running 'Resuming Externalized Checkpoint (rocks, incremental, no parallelism > change) end-to-end test' > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13808) Checkpoints expired by timeout may leak RocksDB files
[ https://issues.apache.org/jira/browse/FLINK-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995375#comment-16995375 ] Congxian Qiu(klion26) commented on FLINK-13808: --- [~sewen] FYI, create an issue to track this FLINK-15236 > Checkpoints expired by timeout may leak RocksDB files > - > > Key: FLINK-13808 > URL: https://issues.apache.org/jira/browse/FLINK-13808 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.8.0, 1.8.1 > Environment: So far only reliably reproducible on a 4-node cluster > with parallelism ≥ 100. But do try > https://github.com/jcaesar/flink-rocksdb-file-leak >Reporter: Julius Michaelis >Priority: Minor > > A RocksDB state backend with HDFS checkpoints, with or without local > recovery, may leak files in {{io.tmp.dirs}} on checkpoint expiry by timeout. > If the size of a checkpoint crosses what can be transferred during one > checkpoint timeout, checkpoints will continue to fail forever. If this is > combined with a quick rollover of SST files (e.g. due to a high density of > writes), this may quickly exhaust available disk space (or memory, as /tmp is > the default location). > As a workaround, the jobmanager's REST API can be frequently queried for > failed checkpoints, and associated files deleted accordingly. > I've tried investing the cause a little bit, but I'm stuck: > * {{Checkpoint 19 of job ac7efce3457d9d73b0a4f775a6ef46f8 expired before > completing.}} and similar gets printed, so > * [{{abortExpired}} is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547-L549], > so > * [{{dispose}} is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L416], > so > * [{{cancelCaller}} is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L488], > so > * [the canceler is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L497] > ([through one more > layer|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]), > so > * [{{cleanup}} is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L95], > (possibly [not from > {{cancel}}|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L84]), > so > * [{{cleanupProvidedResources}} is > invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L162] > (this is the indirection that made me give up), so > * [this trace > log|https://github.com/apache/flink/blob/release-1.8.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L372] > should be printed, but it isn't. > I have some time to further investigate, but I'd appreciate help on finding > out where in this chain things go wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15236) Add a safety net to limit the number of concurrent checkpoints on TM side
[ https://issues.apache.org/jira/browse/FLINK-15236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-15236: -- Summary: Add a safety net to limit the number of concurrent checkpoints on TM side (was: Add a safety net for concurrent checkpoints on TM side) > Add a safety net to limit the number of concurrent checkpoints on TM side > - > > Key: FLINK-15236 > URL: https://issues.apache.org/jira/browse/FLINK-15236 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Priority: Major > > As discussed in FLINK-13808, we can add additional config > {{taskmanager.checkpoints.max-concurrent}} that limits the number of > concurrent checkpoints on the TM for safety net. > this configure {{taskmanager.checkpoints.max-concurrent}}, and the default > value for maxConcurrentCheckpoints=1 is 1 and unlimited for > maxConcurrentCheckpoints > 1. > * If maxConcurrentCheckpoints = 1, the default > {{taskmanager.checkpoints.max-concurrent}} is 1. > * If maxConcurrentCheckpoints > 1 the default value for > {{taskmanager.checkpoints.max-concurrent}}, is unlimited > should not take manually triggered checkpoints/savepoints into account. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15236) Add a safety net for concurrent checkpoints on TM side
Congxian Qiu(klion26) created FLINK-15236: - Summary: Add a safety net for concurrent checkpoints on TM side Key: FLINK-15236 URL: https://issues.apache.org/jira/browse/FLINK-15236 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) As discussed in FLINK-13808, we can add additional config {{taskmanager.checkpoints.max-concurrent}} that limits the number of concurrent checkpoints on the TM for safety net. this configure {{taskmanager.checkpoints.max-concurrent}}, and the default value for maxConcurrentCheckpoints=1 is 1 and unlimited for maxConcurrentCheckpoints > 1. * If maxConcurrentCheckpoints = 1, the default {{taskmanager.checkpoints.max-concurrent}} is 1. * If maxConcurrentCheckpoints > 1 the default value for {{taskmanager.checkpoints.max-concurrent}}, is unlimited should not take manually triggered checkpoints/savepoints into account. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16994704#comment-16994704 ] Congxian Qiu(klion26) commented on FLINK-15105: --- Agree that {{disable the exception check}} here is the easiest way to fix this issue, and we don't need to touch any existing code. Then we'll disable exception check for all test running {{test_resume_externalized_checkpoints.sh}}. I can help to fix this, could someone help to assign this ticket to me? > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test stalls on travis > - > > Key: FLINK-15105 > URL: https://issues.apache.org/jira/browse/FLINK-15105 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test fails on release-1.9 nightly build stalls with "The job > exceeded the maximum log length, and has been terminated". > https://api.travis-ci.org/v3/job/621090394/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554 ] Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/11/19 1:52 PM: - First, answer the last question: we can't just remove "error" message in {{RuntimeException}}, we'll fail in {{common.sh#}}{{check_logs_for_exceptions()}} because of the {{RuntimeException}}. Then I'll try to describe more about the things about {{FailureMapper}}. # {{FailureMapper is only used in {{DataStreamAllroundTestProgram. # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we [enabled TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}} in {{DataStreamAllroundTestProgram}} # {{We'll throw Exception in {{FailureMapper#map and {{FailureMapper#notifyCheckpointComplete}} # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, {{test_ha_per_job_cluster_datastream.sh}} and {{test_resume_externalized_checkpoints.sh}} IIUC, all the above tests are wanna test whether the job can restore from(restore with checkpoint) the last failed job successfully(but we do not care where the exception come from, then Exception thrown from FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same effect). If we want to verify that `failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for it. was (Author: klion26): First, answer the last question: we can't just remove "error" message in {{RuntimeException}}, we'll fail in {{common.sh#}}{{check_logs_for_exceptions()}} because of the {{RuntimeException}}. Then I'll try to describe more about the things about {{FailureMapper}}. # {{FailureMapper is only used in {{DataStreamAllroundTestProgram. # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we [enabled TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}} in {{DataStreamAllroundTestProgram}} # {{We'll throw Exception in {{FailureMapper#map and {{FailureMapper#notifyCheckpointComplete}} # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, {{test_ha_per_job_cluster_datastream.sh}} and {{test_resume_externalized_checkpoints.sh}} IIUC, all the above tests are wanna test whether the job can restore from(restore with checkpoint) the last failed job successfully(but we do not care where the exception come from, then Exception thrown from FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same effect, please correct me if I miss anything here). If we want to verify that `failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for it. > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test stalls on travis > - > > Key: FLINK-15105 > URL: https://issues.apache.org/jira/browse/FLINK-15105 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test fails on release-1.9 nightly build stalls with "The job > exceeded the maximum log length, and has been terminated". > https://api.travis-ci.org/v3/job/621090394/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554 ] Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/11/19 1:46 PM: - First, answer the last question: we can't just remove "error" message in {{RuntimeException}}, we'll fail in {{common.sh#}}{{check_logs_for_exceptions()}} because of the {{RuntimeException}}. Then I'll try to describe more about the things about {{FailureMapper}}. # {{FailureMapper is only used in {{DataStreamAllroundTestProgram. # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we [enabled TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}} in {{DataStreamAllroundTestProgram}} # {{We'll throw Exception in {{FailureMapper#map and {{FailureMapper#notifyCheckpointComplete}} # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, {{test_ha_per_job_cluster_datastream.sh}} and {{test_resume_externalized_checkpoints.sh}} IIUC, all the above tests are wanna test whether the job can restore from(restore with checkpoint) the last failed job successfully(but we do not care where the exception come from, then Exception thrown from FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same effect, please correct me if I miss anything here). If we want to verify that `failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for it. was (Author: klion26): First, answer the last question: we can't just remove "error" message in {{RuntimeException}}, we'll fail in {{common.sh#}}{{check_logs_for_exceptions()}} because of the {{RuntimeException}}. Then I'll try to describe more about the things about {{FailureMapper}}. # {{FailureMapper is only used in {{DataStreamAllroundTestProgram. # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only }}if we [enabled TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]{{}} in {{DataStreamAllroundTestProgram}} # {{We'll throw Exception in }}{{FailureMapper#map}} and {{FailureMapper#notifyCheckpointComplete}} # {{we'll enable }}{{TEST_SIMULATE_FAILURE}} in {{test_ha_datastream.sh}}, {{test_ha_per_job_cluster_datastream.sh}} and {{test_resume_externalized_checkpoints.sh}} IIUC, all the above tests are wanna test whether the job can restore from(restore with checkpoint) the last failed job successfully(but we do not care where the exception come from, then Exception thrown from FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same effect, please correct me if I miss anything here). If we want to verify that `failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for it. > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test stalls on travis > - > > Key: FLINK-15105 > URL: https://issues.apache.org/jira/browse/FLINK-15105 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test fails on release-1.9 nightly build stalls with "The job > exceeded the maximum log length, and has been terminated". > https://api.travis-ci.org/v3/job/621090394/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554 ] Congxian Qiu(klion26) commented on FLINK-15105: --- First, answer the last question: we can't just remove "error" message in {{RuntimeException}}, we'll fail in {{common.sh#}}{{check_logs_for_exceptions()}} because of the {{RuntimeException}}. Then I'll try to describe more about the things about {{FailureMapper}}. # {{FailureMapper is only used in {{DataStreamAllroundTestProgram. # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only }}if we [enabled TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]{{}} in {{DataStreamAllroundTestProgram}} # {{We'll throw Exception in }}{{FailureMapper#map}} and {{FailureMapper#notifyCheckpointComplete}} # {{we'll enable }}{{TEST_SIMULATE_FAILURE}} in {{test_ha_datastream.sh}}, {{test_ha_per_job_cluster_datastream.sh}} and {{test_resume_externalized_checkpoints.sh}} IIUC, all the above tests are wanna test whether the job can restore from(restore with checkpoint) the last failed job successfully(but we do not care where the exception come from, then Exception thrown from FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same effect, please correct me if I miss anything here). If we want to verify that `failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for it. > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test stalls on travis > - > > Key: FLINK-15105 > URL: https://issues.apache.org/jira/browse/FLINK-15105 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test fails on release-1.9 nightly build stalls with "The job > exceeded the maximum log length, and has been terminated". > https://api.travis-ci.org/v3/job/621090394/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673 ] Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/10/19 4:09 PM: - [~trohrmann] In the previous comment, I didn't want to remove the whole {{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment in the below code block.}} {code:java} public T map(T value) throws Exception { numProcessedRecords++; if (isReachedFailureThreshold()) { throw new Exception("Artificial failure."); } return value; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { numCompleteCheckpoints++; if (isReachedFailureThreshold()) { // === just want to remove this === throw new Exception("Artificial failure."); } } {code} I think the problem here is that we throw Artifical failure when completing checkpoint After throwing {{Artificial failure}} in {{FailureMapper#notifyCheckpointComplete}} ---> we got the following exception(attached below) ---> test failed when {{check_logs_for_errors}} using the commands in {{common.sh}}. {code:java} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more {code} Remove the {{Artificial failure throwing}} in {{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial failure throwing}} is just needed when the source is finite, but in the test job, we use [SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102], it is an infinite source. was (Author: klion26): [~trohrmann] In the previous comment, I'm not want to remove the whole {{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment in the below code block.}} {code:java} public T map(T value) throws Exception { numProcessedRecords++; if (isReachedFailureThreshold()) { throw new Exception("Artificial failure."); } return value; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { numCompleteCheckpoints++; if (isReachedFailureThreshold()) { // === just want to remove this === throw new Exception("Artificial failure."); } } {code} I think the problem here is that we throw Artifical failure when completing checkpoint After throwing {{Artificial failure}} in {{FailureMapper#notifyCheckpointComplete}} ---> we got the following exception(attached below) ---> test failed when {{check_logs_for_errors}} using the commands in {{common.sh}}. {code:java} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at
[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673 ] Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/10/19 3:49 PM: - [~trohrmann] In the previous comment, I'm not want to remove the whole {{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment in the below code block.}} {code:java} public T map(T value) throws Exception { numProcessedRecords++; if (isReachedFailureThreshold()) { throw new Exception("Artificial failure."); } return value; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { numCompleteCheckpoints++; if (isReachedFailureThreshold()) { // === just want to remove this === throw new Exception("Artificial failure."); } } {code} I think the problem here is that we throw Artifical failure when completing checkpoint After throwing {{Artificial failure}} in {{FailureMapper#notifyCheckpointComplete}} ---> we got the following exception(attached below) ---> test failed when {{check_logs_for_errors}} using the commands in {{common.sh}}. {code:java} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more {code} Remove the {{Artificial failure throwing}} in {{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial failure throwing}} is just needed when the source is finite, but in the test job, we use [SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102], it is an infinite source. was (Author: klion26): [~trohrmann] In the previous comment, I'm not want to remove the whole {{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment in the below code block.}} I think the problem here is that we throw Artifical failure when completing checkpoint(we'll throw Artifical failure in two places in {{FailureMapper}}) {code:java} public T map(T value) throws Exception { numProcessedRecords++; if (isReachedFailureThreshold()) { throw new Exception("Artificial failure."); } return value; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { numCompleteCheckpoints++; if (isReachedFailureThreshold()) { // === just want to remove this === throw new Exception("Artificial failure."); } } {code} After throwing {{Artificial failure}} in {{FailureMapper#notifyCheckpointComplete}} ---> we got the following exception(attached below) ---> test failed when {{check_logs_for_errors}} using the commands in {{common.sh}}. {code:java} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673 ] Congxian Qiu(klion26) commented on FLINK-15105: --- [~trohrmann] In the previous comment, I'm not want to remove the whole {{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment in the below code block.}} I think the problem here is that we throw Artifical failure when completing checkpoint(we'll throw Artifical failure in two places in {{FailureMapper}}) {code:java} public T map(T value) throws Exception { numProcessedRecords++; if (isReachedFailureThreshold()) { throw new Exception("Artificial failure."); } return value; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { numCompleteCheckpoints++; if (isReachedFailureThreshold()) { // === just want to remove this === throw new Exception("Artificial failure."); } } {code} After throwing {{Artificial failure}} in {{FailureMapper#notifyCheckpointComplete}} ---> we got the following exception(attached below) ---> test failed when {{check_logs_for_errors}} using the commands in {{common.sh}}. {code:java} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more {code} Remove the {{Artificial failure throwing}} in {{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial failure throwing}} is just needed when the source is finite, but in the test job, we use [SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102], it is an infinite source. > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test stalls on travis > - > > Key: FLINK-15105 > URL: https://issues.apache.org/jira/browse/FLINK-15105 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Yu Li >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > Resuming Externalized Checkpoint after terminal failure (rocks, incremental) > end-to-end test fails on release-1.9 nightly build stalls with "The job > exceeded the maximum log length, and has been terminated". > https://api.travis-ci.org/v3/job/621090394/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15150) ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed on Travis
Congxian Qiu(klion26) created FLINK-15150: - Summary: ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed on Travis Key: FLINK-15150 URL: https://issues.apache.org/jira/browse/FLINK-15150 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) 06:37:18.423 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.014 s <<< FAILURE! - in org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase 375406:37:18.423 [ERROR] testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase) Time elapsed: 14.001 s <<< ERROR! 3755java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: JobMaster has been shut down. 3756 at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.lambda$testJobExecutionOnClusterWithLeaderChange$1(ZooKeeperLeaderElectionITCase.java:131) 3757 at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:131) 3758Caused by: org.apache.flink.util.FlinkException: JobMaster has been shut down. 3759 [https://travis-ci.com/flink-ci/flink/jobs/264972218] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15097) flink can not use user specified hdfs conf when submitting app in client node
[ https://issues.apache.org/jira/browse/FLINK-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991192#comment-16991192 ] Congxian Qiu(klion26) commented on FLINK-15097: --- If these two are truly the same, could we close this issue as Duplicated and linked to FLINK-11135? > flink can not use user specified hdfs conf when submitting app in client node > - > > Key: FLINK-15097 > URL: https://issues.apache.org/jira/browse/FLINK-15097 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.9.1 >Reporter: qian wang >Priority: Major > Attachments: 0001-adjust-read-hdfs-conf-order.patch > > > now if cluster node had set env HADOOP_CONF_DIR,flink would force use the > hdfs-site.xml in the corresponding dir, then user who submitted app in the > client node couldn't use custom specified hdfs-site.xml/hdfs-default through > setting fs.hdfs.hdfssite or fs.hdfs.hdfsdefault so as to set custom blocksize > or replication num. For example Using yarnship to upload my hdfs conf dir and > set fs.hdfs.hdfssite direct to \{conf dir}/hdfs-site.xml is useless > Deep in code it is due to the order of choosing conf in HadoopUtils.java,the > conf in HADOOP_CONF_DIR will override user's uploaded conf, i think the way > is not sensible, so i reverse the order which flink read hdfs conf in order > to let user custom conf uploaded override HADOOP_CONF_DIR -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991179#comment-16991179 ] Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/9/19 6:38 AM: The test complete checkpoint successfully in the first job, and resumed from the checkpoint successfully in the second job, and can complete checkpoint in the seconde job successfully, {code:java} // log for first job 2019-12-05 20:12:17,410 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - SlidingWindowOperator (1/2) (5a5c73dd041a0145bc02dc017e46bf1f) switched from DE PLOYING to RUNNING. 2019-12-05 20:12:17,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1575576737956 for job 39a292088648857cac5f7e110547c18 0. 2019-12-05 20:12:21,095 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 39a292088648857cac5f7e110547c180 (261564 bytes in 3114 ms). 2019-12-05 20:12:21,113 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1575576741094 for job 39a292088648857cac5f7e110547c180. 2019-12-05 20:12:22,002 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FailureMapper (1/1) (4d273da136346ef3ff6e1a54d197f00b) switched from RUNNING to FAILED. java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more 2019-12-05 20:12:22,014 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 2 of job 39a292088648857cac5f7e110547c180. java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more // log for second job 2019-12-05 20:12:27,190 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7c862506012fb04c0d565bfda7cc9595 from savepoint file:///home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-02075534631/externalized-chckpt-e2e-backend-dir/39a292088648857cac5f7e110547c180/chk-1 () 2019-12-05 20:12:27,213 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7c862506012fb04c0d565bfda7cc9595 to 2. 2019-12-05 20:12:27,220 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 7c862506012fb04c0d565bfda7cc9595 from latest valid checkpoint: Checkpoint 1 @ 0 for 7c862506012fb04c0d565bfda7cc9595. 2019-12-05 20:12:27,231 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore 2019-12-05 20:12:27,232 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job General purpose test job (7c862506012fb04c0d565bfda7cc9595) was granted leadership with session id ---- at akka.tcp://flink@localhost:6123/user/jobmanager_1. 2019-12-05 20:12:27,233 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job General purpose
[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis
[ https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991179#comment-16991179 ] Congxian Qiu(klion26) commented on FLINK-15105: --- The test complete checkpoint successfully in the first job, and resumed from the checkpoint successfully in the second job, and can complete checkpoint in the seconde job successfully, {code:java} // log for first job 2019-12-05 20:12:17,410 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - SlidingWindowOperator (1/2) (5a5c73dd041a0145bc02dc017e46bf1f) switched from DE PLOYING to RUNNING. 2019-12-05 20:12:17,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1575576737956 for job 39a292088648857cac5f7e110547c18 0. 2019-12-05 20:12:21,095 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job 39a292088648857cac5f7e110547c180 (261564 bytes in 3114 ms). 2019-12-05 20:12:21,113 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1575576741094 for job 39a292088648857cac5f7e110547c180. 2019-12-05 20:12:22,002 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - FailureMapper (1/1) (4d273da136346ef3ff6e1a54d197f00b) switched from RUNNING to FAILED. java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more 2019-12-05 20:12:22,014 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 2 of job 39a292088648857cac5f7e110547c180. java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Artificial failure. at org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ... 5 more // log for second job 2019-12-05 20:12:27,190 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7c862506012fb04c0d565bfda7cc9595 from savepoint file:///home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-02075534631/externalized-chckpt-e2e-backend-dir/39a292088648857cac5f7e110547c180/chk-1 () 2019-12-05 20:12:27,213 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7c862506012fb04c0d565bfda7cc9595 to 2. 2019-12-05 20:12:27,220 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 7c862506012fb04c0d565bfda7cc9595 from latest valid checkpoint: Checkpoint 1 @ 0 for 7c862506012fb04c0d565bfda7cc9595. 2019-12-05 20:12:27,231 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore 2019-12-05 20:12:27,232 INFO org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager runner for job General purpose test job (7c862506012fb04c0d565bfda7cc9595) was granted leadership with session id ---- at akka.tcp://flink@localhost:6123/user/jobmanager_1. 2019-12-05 20:12:27,233 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job General purpose test job (7c862506012fb04c0d565bfda7cc9595)
[jira] [Commented] (FLINK-14898) Enable background cleanup of state with TTL by default
[ https://issues.apache.org/jira/browse/FLINK-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991103#comment-16991103 ] Congxian Qiu(klion26) commented on FLINK-14898: --- [~azagrebin] have created FLINK-15136 for this > Enable background cleanup of state with TTL by default > -- > > Key: FLINK-14898 > URL: https://issues.apache.org/jira/browse/FLINK-14898 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > So far, we were conservative about enabling background cleanup strategies for > state with TTL. In general, if state is configured to have TTL, most users > would expect the background cleanup to kick in. As there were no reported > issues so far since the release of backend specific cleanups and that should > not affect any state without TTL, this issue suggests to enable default > background cleanup for backends. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15136) Update the Chinese version of "Working with state"
[ https://issues.apache.org/jira/browse/FLINK-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991070#comment-16991070 ] Congxian Qiu(klion26) commented on FLINK-15136: --- As I translated the original "Working with State" page, I'm glad to fix this. [~jark] could you please assign this ticket to me cc [~azagrebin] > Update the Chinese version of "Working with state" > -- > > Key: FLINK-15136 > URL: https://issues.apache.org/jira/browse/FLINK-15136 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Priority: Major > > Currently, we enabled background cleanup of state with TTL by default in > FLINK-14898, and we should update the Chinese version to respect it. > > documentation location : docs/dev/stream/state/state.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15136) Update the Chinese version of "Working with state"
[ https://issues.apache.org/jira/browse/FLINK-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-15136: -- Affects Version/s: 1.10.0 > Update the Chinese version of "Working with state" > -- > > Key: FLINK-15136 > URL: https://issues.apache.org/jira/browse/FLINK-15136 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.10.0 >Reporter: Congxian Qiu(klion26) >Priority: Major > > Currently, we enabled background cleanup of state with TTL by default in > FLINK-14898, and we should update the Chinese version to respect it. > > documentation location : docs/dev/stream/state/state.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15136) Update the Chinese version of "Working with state"
Congxian Qiu(klion26) created FLINK-15136: - Summary: Update the Chinese version of "Working with state" Key: FLINK-15136 URL: https://issues.apache.org/jira/browse/FLINK-15136 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Congxian Qiu(klion26) Currently, we enabled background cleanup of state with TTL by default in FLINK-14898, and we should update the Chinese version to respect it. documentation location : docs/dev/stream/state/state.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11937) Resolve small file problem in RocksDB incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-11937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986861#comment-16986861 ] Congxian Qiu(klion26) commented on FLINK-11937: --- Retarget to 1.11.0 to give more time for review and testing. There is an early version [here|https://github.com/klion26/flink/tree/small_files_with_pluggable_sharedregistry] and please refer to it if anyone meets this same problem and would like a quick fix. Thanks. > Resolve small file problem in RocksDB incremental checkpoint > > > Key: FLINK-11937 > URL: https://issues.apache.org/jira/browse/FLINK-11937 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently when incremental checkpoint is enabled in RocksDBStateBackend a > separate file will be generated on DFS for each sst file. This may cause > “file flood” when running intensive workload (many jobs with high > parallelism) in big cluster. According to our observation in Alibaba > production, such file flood introduces at lease two drawbacks when using HDFS > as the checkpoint storage FileSystem: 1) huge number of RPC request issued to > NN which may burst its response queue; 2) huge number of files causes big > pressure on NN’s on-heap memory. > In Flink we ever noticed similar small file flood problem and tried to > resolved it by introducing ByteStreamStateHandle(FLINK-2808), but this > solution has its limitation that if we configure the threshold too low there > will still be too many small files, while if too high the JM will finally > OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend > with incremental snapshot strategy. > We propose a new OutputStream called > FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS > will reuse the same underlying distributed file until its size exceeds a > preset threshold. We > plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly > resolve the specific storage amplification issue on FSCSOS, and lastly add an > option to reuse FSCSOS across multiple checkpoints to further reduce the DFS > file number. > More details please refer to the attached design doc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986855#comment-16986855 ] Congxian Qiu(klion26) commented on FLINK-13553: --- I'll take a look at this issue, currently can't reproduce this locally, will try to run on Travis > KvStateServerHandlerTest.readInboundBlocking unstable on Travis > --- > > Key: FLINK-13553 > URL: https://issues.apache.org/jira/browse/FLINK-13553 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.10.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.10.0 > > > The {{KvStateServerHandlerTest.readInboundBlocking}} and > {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a > {{TimeoutException}}. > https://api.travis-ci.org/v3/job/566420641/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13861) No new checkpoint will be trigged when canceling an expired checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986763#comment-16986763 ] Congxian Qiu(klion26) commented on FLINK-13861: --- [~pnowojski] [~SleePy] thanks for your reply. and sorry for my late reply(I thought that I have replied after the off-line discuss with [~SleePy]) Yes, I'm interested in the root cause also, but we did not encounter it anymore. After the off-line discuss with [~SleePy], we reach an agreement that ignores the exception directly may not be a good solution, ignoring the exception directly need we take attention to more and more corner cases, maybe fail the whole job when encountering such a case is more reasonable. As the 1.10 will code freeze soon and I have some other work on hands now, so I just remove the fix version temporarily. and will come back soon :) > No new checkpoint will be trigged when canceling an expired checkpoint failed > - > > Key: FLINK-13861 > URL: https://issues.apache.org/jira/browse/FLINK-13861 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > I encountered this problem in our private fork of Flink, after taking a look > at the current master branch of Apache Flink, I think the problem exists here > also. > Problem Detail: > 1. checkpoint canceled because of expiration, so will call the canceller > such as below > {code:java} > final Runnable canceller = () -> { >synchronized (lock) { > // only do the work if the checkpoint is not discarded anyways > // note that checkpoint completion discards the pending checkpoint > object > if (!checkpoint.isDiscarded()) { > LOG.info("Checkpoint {} of job {} expired before completing.", > checkpointID, job); > failPendingCheckpoint(checkpoint, > CheckpointFailureReason.CHECKPOINT_EXPIRED); > pendingCheckpoints.remove(checkpointID); > rememberRecentCheckpointId(checkpointID); > triggerQueuedRequests(); > } >} > };{code} > > But failPendingCheckpoint may throw exceptions because it will call > {{CheckpointCoordinator#failPendingCheckpoint}} > -> {{PendingCheckpoint#abort}} > -> {{PendingCheckpoint#reportFailedCheckpoint}} > -> initialize a FailedCheckpointStates, may throw an exception by > {{checkArgument}} > Did not find more about why there ever failed the {{checkArgument > currently(this problem did not reproduce frequently)}}, will create an issue > for that if I have more findings. > > 2. when trigger checkpoint next, we'll first check if there already are too > many checkpoints such as below > {code:java} > private void checkConcurrentCheckpoints() throws CheckpointException { >if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { > triggerRequestQueued = true; > if (currentPeriodicTrigger != null) { > currentPeriodicTrigger.cancel(false); > currentPeriodicTrigger = null; > } > throw new > CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); >} > } > {code} > the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will > always true > 3. no checkpoint will be triggered ever from that on. > Because of the {{failPendingCheckpoint}} may throw Exception, so we may > place the remove pending checkpoint logic in a finally block. > I'd like to file a pr for this if this really needs to fix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-13861) No new checkpoint will be trigged when canceling an expired checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-13861: -- Fix Version/s: (was: 1.10.0) > No new checkpoint will be trigged when canceling an expired checkpoint failed > - > > Key: FLINK-13861 > URL: https://issues.apache.org/jira/browse/FLINK-13861 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.7.2, 1.8.1, 1.9.0 >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > > I encountered this problem in our private fork of Flink, after taking a look > at the current master branch of Apache Flink, I think the problem exists here > also. > Problem Detail: > 1. checkpoint canceled because of expiration, so will call the canceller > such as below > {code:java} > final Runnable canceller = () -> { >synchronized (lock) { > // only do the work if the checkpoint is not discarded anyways > // note that checkpoint completion discards the pending checkpoint > object > if (!checkpoint.isDiscarded()) { > LOG.info("Checkpoint {} of job {} expired before completing.", > checkpointID, job); > failPendingCheckpoint(checkpoint, > CheckpointFailureReason.CHECKPOINT_EXPIRED); > pendingCheckpoints.remove(checkpointID); > rememberRecentCheckpointId(checkpointID); > triggerQueuedRequests(); > } >} > };{code} > > But failPendingCheckpoint may throw exceptions because it will call > {{CheckpointCoordinator#failPendingCheckpoint}} > -> {{PendingCheckpoint#abort}} > -> {{PendingCheckpoint#reportFailedCheckpoint}} > -> initialize a FailedCheckpointStates, may throw an exception by > {{checkArgument}} > Did not find more about why there ever failed the {{checkArgument > currently(this problem did not reproduce frequently)}}, will create an issue > for that if I have more findings. > > 2. when trigger checkpoint next, we'll first check if there already are too > many checkpoints such as below > {code:java} > private void checkConcurrentCheckpoints() throws CheckpointException { >if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { > triggerRequestQueued = true; > if (currentPeriodicTrigger != null) { > currentPeriodicTrigger.cancel(false); > currentPeriodicTrigger = null; > } > throw new > CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); >} > } > {code} > the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will > always true > 3. no checkpoint will be triggered ever from that on. > Because of the {{failPendingCheckpoint}} may throw Exception, so we may > place the remove pending checkpoint logic in a finally block. > I'd like to file a pr for this if this really needs to fix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-11937) Resolve small file problem in RocksDB incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-11937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-11937: -- Fix Version/s: (was: 1.10.0) 1.11.0 > Resolve small file problem in RocksDB incremental checkpoint > > > Key: FLINK-11937 > URL: https://issues.apache.org/jira/browse/FLINK-11937 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently when incremental checkpoint is enabled in RocksDBStateBackend a > separate file will be generated on DFS for each sst file. This may cause > “file flood” when running intensive workload (many jobs with high > parallelism) in big cluster. According to our observation in Alibaba > production, such file flood introduces at lease two drawbacks when using HDFS > as the checkpoint storage FileSystem: 1) huge number of RPC request issued to > NN which may burst its response queue; 2) huge number of files causes big > pressure on NN’s on-heap memory. > In Flink we ever noticed similar small file flood problem and tried to > resolved it by introducing ByteStreamStateHandle(FLINK-2808), but this > solution has its limitation that if we configure the threshold too low there > will still be too many small files, while if too high the JM will finally > OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend > with incremental snapshot strategy. > We propose a new OutputStream called > FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS > will reuse the same underlying distributed file until its size exceeds a > preset threshold. We > plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly > resolve the specific storage amplification issue on FSCSOS, and lastly add an > option to reuse FSCSOS across multiple checkpoints to further reduce the DFS > file number. > More details please refer to the attached design doc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14966) one error in document
[ https://issues.apache.org/jira/browse/FLINK-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983411#comment-16983411 ] Congxian Qiu(klion26) edited comment on FLINK-14966 at 11/27/19 11:12 AM: -- I think this is a typo as you described. could you please update the issue title so that others can know the problem from the title. (where the problem is, what the problem is) was (Author: klion26): I think this is a typo as you described. could you please update the issue title so that others can know the problem from the title. > one error in document > -- > > Key: FLINK-14966 > URL: https://issues.apache.org/jira/browse/FLINK-14966 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.9.0 >Reporter: yuzhemin >Priority: Trivial > > Document url is > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html] > error location: _Application Development > Basic API Concepts>Supported Data > Types>Values_ > I guess the location _org.apache.flinktypes.Value and > org.apache.flinktypes.CopyableValue_ _referred in the paragraph means > org.apache.flink.types.XXX_ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14966) one error in document
[ https://issues.apache.org/jira/browse/FLINK-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983411#comment-16983411 ] Congxian Qiu(klion26) commented on FLINK-14966: --- I think this is a typo as you described. could you please update the issue title so that others can know the problem from the title. > one error in document > -- > > Key: FLINK-14966 > URL: https://issues.apache.org/jira/browse/FLINK-14966 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.9.0 >Reporter: yuzhemin >Priority: Trivial > > Document url is > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html] > error location: _Application Development > Basic API Concepts>Supported Data > Types>Values_ > I guess the location _org.apache.flinktypes.Value and > org.apache.flinktypes.CopyableValue_ _referred in the paragraph means > org.apache.flink.types.XXX_ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14110) Deleting state.backend.rocksdb.localdir causes silent failure
[ https://issues.apache.org/jira/browse/FLINK-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980708#comment-16980708 ] Congxian Qiu(klion26) commented on FLINK-14110: --- [~liyu] thanks for the reminder, will take a look at this issue. > Deleting state.backend.rocksdb.localdir causes silent failure > - > > Key: FLINK-14110 > URL: https://issues.apache.org/jira/browse/FLINK-14110 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.1, 1.9.0 > Environment: Flink {{1.8.1}} and {{1.9.0}}. > JVM 8 >Reporter: Aaron Levin >Priority: Minor > > Suppose {{state.backend.rocksdb.localdir}} is configured as: > {noformat} > state.backend.rocksdb.localdir: /flink/tmp > {noformat} > If I then run \{{rm -rf /flink/tmp/job_*}} on a host while a Flink > application is running, I will observe the following: > * throughput of my operators running on that host will drop to zero > * the application will not fail or restart > * the task manager will not fail or restart > * in most cases there is nothing in the logs to indicate a failure (I've run > this several times and only once seen an exception - I believe I was lucky > and deleted those directories during a checkpoint or something) > The desired behaviour here would be to throw an exception and crash, instead > of silently dropping throughput to zero. Restarting the Task Manager will > resolve the issues. > I only tried this on Flink {{1.8.1}} and {{1.9.0}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14892) Add documentation for checkpoint directory layout
[ https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980652#comment-16980652 ] Congxian Qiu(klion26) commented on FLINK-14892: --- [~pnowojski] thanks for the reply and assigning, as checkpoint directory(contrast to savepoint) is Flink internal concept, will add this information when filing a pr. > Add documentation for checkpoint directory layout > - > > Key: FLINK-14892 > URL: https://issues.apache.org/jira/browse/FLINK-14892 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > In FLINK-8531, we change the checkpoint directory layout to > {code:java} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-1/ > + --chk-2/ > + --chk-3/ > ... > {code} > But the directory layout did not describe in the doc currently, and I found > some users confused about this, such as[1][2], so I propose to add a > description for the checkpoint directory layout in the documentation, maybe > in the page {{checkpoints#DirectoryStructure}}[3] > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751] > [2] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html] > [3] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis
[ https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980151#comment-16980151 ] Congxian Qiu(klion26) commented on FLINK-14928: --- FLINK-14866 fixed much of them, but there still a few broken links in page systemFunctions.html > Documentation links check nightly run failed on travis > -- > > Key: FLINK-14928 > URL: https://issues.apache.org/jira/browse/FLINK-14928 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.10.0 >Reporter: Yu Li >Assignee: Congxian Qiu(klion26) >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > This test stage fails stably, with below error: > {noformat} > [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found. > [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/getting-started/tutorials/datastream_api.html' not found. > [2019-11-22 03:10:49] ERROR > `/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found. > http://localhost:4000/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/getting-started/tutorials/datastream_api.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > --- > Found 11 broken links. > {noformat} > And here is the latest instance: > https://api.travis-ci.org/v3/job/615032410/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis
[ https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979973#comment-16979973 ] Congxian Qiu(klion26) commented on FLINK-14928: --- the reason here is FLINK-14638 moved some doc to another place, I'll file a pr for this. > Documentation links check nightly run failed on travis > -- > > Key: FLINK-14928 > URL: https://issues.apache.org/jira/browse/FLINK-14928 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.10.0 >Reporter: Yu Li >Assignee: Congxian Qiu(klion26) >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > This test stage fails stably, with below error: > {noformat} > [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found. > [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/getting-started/tutorials/datastream_api.html' not found. > [2019-11-22 03:10:49] ERROR > `/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found. > http://localhost:4000/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/getting-started/tutorials/datastream_api.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > --- > Found 11 broken links. > {noformat} > And here is the latest instance: > https://api.travis-ci.org/v3/job/615032410/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis
[ https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979967#comment-16979967 ] Congxian Qiu(klion26) commented on FLINK-14928: --- I'll take a look at this issue. > Documentation links check nightly run failed on travis > -- > > Key: FLINK-14928 > URL: https://issues.apache.org/jira/browse/FLINK-14928 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.10.0 >Reporter: Yu Li >Assignee: Yu Li >Priority: Blocker > Labels: test-stability > Fix For: 1.10.0 > > > This test stage fails stably, with below error: > {noformat} > [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found. > [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/getting-started/tutorials/datastream_api.html' not found. > [2019-11-22 03:10:49] ERROR > `/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found. > [2019-11-22 03:10:49] ERROR > `/zh/dev/table/functions/streaming/query_configuration.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found. > [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found. > http://localhost:4000/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/getting-started/tutorials/datastream_api.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/udfs.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/zh/dev/table/functions/tableApi.html: > Remote file does not exist -- broken link!!! > --- > Found 11 broken links. > {noformat} > And here is the latest instance: > https://api.travis-ci.org/v3/job/615032410/log.txt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14892) Add documentation for checkpoint directory layout
[ https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-14892: -- Summary: Add documentation for checkpoint directory layout (was: Add description for checkpoint directory layout) > Add documentation for checkpoint directory layout > - > > Key: FLINK-14892 > URL: https://issues.apache.org/jira/browse/FLINK-14892 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > In FLINK-8531, we change the checkpoint directory layout to > {code:java} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-1/ > + --chk-2/ > + --chk-3/ > ... > {code} > But the directory layout did not describe in the doc currently, and I found > some users confused about this, such as[1][2], so I propose to add a > description for the checkpoint directory layout in the documentation, maybe > in the page {{checkpoints#DirectoryStructure}}[3] > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751] > [2] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html] > [3] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14892) Add description for checkpoint directory layout
[ https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979239#comment-16979239 ] Congxian Qiu(klion26) commented on FLINK-14892: --- [~sewen] as you're the owner of FLINK-8531, what do you think about this, and please assign this to me if is valid, thanks > Add description for checkpoint directory layout > --- > > Key: FLINK-14892 > URL: https://issues.apache.org/jira/browse/FLINK-14892 > Project: Flink > Issue Type: Improvement > Components: Documentation, Runtime / Checkpointing >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > In FLINK-8531, we change the checkpoint directory layout to > {code:java} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-1/ > + --chk-2/ > + --chk-3/ > ... > {code} > But the directory layout did not describe in the doc currently, and I found > some users confused about this, such as[1][2], so I propose to add a > description for the checkpoint directory layout in the documentation, maybe > in the page {{checkpoints#DirectoryStructure}}[3] > [1] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751] > [2] > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html] > [3] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14892) Add description for checkpoint directory layout
Congxian Qiu(klion26) created FLINK-14892: - Summary: Add description for checkpoint directory layout Key: FLINK-14892 URL: https://issues.apache.org/jira/browse/FLINK-14892 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 In FLINK-8531, we change the checkpoint directory layout to {code:java} /user-defined-checkpoint-dir | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ... {code} But the directory layout did not describe in the doc currently, and I found some users confused about this, such as[1][2], so I propose to add a description for the checkpoint directory layout in the documentation, maybe in the page {{checkpoints#DirectoryStructure}}[3] [1] [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751] [2] [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html] [3] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI
[ https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977991#comment-16977991 ] Congxian Qiu(klion26) commented on FLINK-14264: --- kindly ping [~aljoscha] > Expose CheckpointBackend in checkpoint config RestAPI > - > > Key: FLINK-14264 > URL: https://issues.apache.org/jira/browse/FLINK-14264 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, we can get checkpoint config from rest api[1], the response > contains the information as below > * timeout > * min_pause > * max_concurrent > * externalization > But did not contain the type of CheckpointBackend, but in some scenarios, we > want to get the CheckpointBackend type from Rest, this issue wants to add the > simple name of the CheckpointBackend in the {{checkpoints/config rest with > key }}{{checkpoint_backend, so the response will contain the information such > as below}} > * {{timeout}} > * {{min_pause}} > * {{max_concurrent}} > * checkpoint_backend > * externalization > > [1] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page
[ https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975905#comment-16975905 ] Congxian Qiu(klion26) commented on FLINK-14830: --- [~jark], I agree that changing all the links in {{.zh.md}} pages has a lot of work to do. I think we can apply this to the new added {{.zh.md}} document, when adding a new {{.zh.md}} document we need to change the link to the Chinese version, for the exist {{.zh.md}} page, we can update the found wrong links for better user experience. > Correct the link for chinese version stream_checkpointing page > --- > > Key: FLINK-14830 > URL: https://issues.apache.org/jira/browse/FLINK-14830 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, in Chinese version of stream_checkpointing page, there are some > links not correct set to the Chinese version, but set to the English version. > Such as > {{[site.baseurl }}/dev/stream/state/index.html > |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md] > _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_. > [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html) > [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) > > This issue wants to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page
[ https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975901#comment-16975901 ] Congxian Qiu(klion26) commented on FLINK-14830: --- [~jark] I think this is a little tricky, because the target page has already been translated, but the link did not respect it yet. The page[1] did not been translated yet, when we click the link of {{[Restart Strategies|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/restart_strategies.html] }} we'll redirect to English version currently, but the target page has already been translated[2] I think this needs to be improved, and we need to update all the Chinese version even if they have not been translated yet to prevent this problem, what do you think? [1] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/internals/stream_checkpointing.html#recovery] [2] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/task_failure_recovery.html] > Correct the link for chinese version stream_checkpointing page > --- > > Key: FLINK-14830 > URL: https://issues.apache.org/jira/browse/FLINK-14830 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, in Chinese version of stream_checkpointing page, there are some > links not correct set to the Chinese version, but set to the English version. > Such as > {{[site.baseurl }}/dev/stream/state/index.html > |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md] > _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_. > [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html) > [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) > > This issue wants to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page
[ https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975685#comment-16975685 ] Congxian Qiu(klion26) commented on FLINK-14830: --- [~jark] what do you think about this, please assign to me if this is valid. thanks. > Correct the link for chinese version stream_checkpointing page > --- > > Key: FLINK-14830 > URL: https://issues.apache.org/jira/browse/FLINK-14830 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, in Chinese version of stream_checkpointing page, there are some > links not correct set to the Chinese version, but set to the English version. > Such as > {{[site.baseurl }}/dev/stream/state/index.html > |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md] > _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_. > [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html) > [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) > > This issue wants to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14830) Correct the link for chinese version stream_checkpointing page
Congxian Qiu(klion26) created FLINK-14830: - Summary: Correct the link for chinese version stream_checkpointing page Key: FLINK-14830 URL: https://issues.apache.org/jira/browse/FLINK-14830 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.1 Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 Currently, in Chinese version of stream_checkpointing page, there are some links not correct set to the Chinese version, but set to the English version. Such as {{[site.baseurl }}/dev/stream/state/index.html |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md] _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_. [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html) [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) This issue wants to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14776) Migrate duration and memory size ConfigOptions in RocksDBConfigurableOptions
[ https://issues.apache.org/jira/browse/FLINK-14776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16974892#comment-16974892 ] Congxian Qiu(klion26) commented on FLINK-14776: --- [~dwysakowicz] Can I help to do this ticket? > Migrate duration and memory size ConfigOptions in RocksDBConfigurableOptions > > > Key: FLINK-14776 > URL: https://issues.apache.org/jira/browse/FLINK-14776 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI
[ https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973360#comment-16973360 ] Congxian Qiu(klion26) commented on FLINK-14264: --- [~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} After your question, I think maybe {{state_backend}} is better because we describle them as {{state_backend}} in the doc[1] [1] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html] > Expose CheckpointBackend in checkpoint config RestAPI > - > > Key: FLINK-14264 > URL: https://issues.apache.org/jira/browse/FLINK-14264 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, we can get checkpoint config from rest api[1], the response > contains the information as below > * timeout > * min_pause > * max_concurrent > * externalization > But did not contain the type of CheckpointBackend, but in some scenarios, we > want to get the CheckpointBackend type from Rest, this issue wants to add the > simple name of the CheckpointBackend in the {{checkpoints/config rest with > key }}{{checkpoint_backend, so the response will contain the information such > as below}} > * {{timeout}} > * {{min_pause}} > * {{max_concurrent}} > * checkpoint_backend > * externalization > > [1] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI
[ https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973360#comment-16973360 ] Congxian Qiu(klion26) edited comment on FLINK-14264 at 11/13/19 1:57 PM: - [~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} After your question, I think maybe {{state_backend}} is better because we describle them as {{state_backend}} in the doc[1], what do you think, thanks [1] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html] was (Author: klion26): [~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} After your question, I think maybe {{state_backend}} is better because we describle them as {{state_backend}} in the doc[1] [1] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html] > Expose CheckpointBackend in checkpoint config RestAPI > - > > Key: FLINK-14264 > URL: https://issues.apache.org/jira/browse/FLINK-14264 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, we can get checkpoint config from rest api[1], the response > contains the information as below > * timeout > * min_pause > * max_concurrent > * externalization > But did not contain the type of CheckpointBackend, but in some scenarios, we > want to get the CheckpointBackend type from Rest, this issue wants to add the > simple name of the CheckpointBackend in the {{checkpoints/config rest with > key }}{{checkpoint_backend, so the response will contain the information such > as below}} > * {{timeout}} > * {{min_pause}} > * {{max_concurrent}} > * checkpoint_backend > * externalization > > [1] > [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14433) Move generated Jaas conf file from /tmp directory to Job specific directory
[ https://issues.apache.org/jira/browse/FLINK-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973211#comment-16973211 ] Congxian Qiu(klion26) commented on FLINK-14433: --- [~aljoscha] thanks for your reply, will prepare a pr for this. > Move generated Jaas conf file from /tmp directory to Job specific directory > --- > > Key: FLINK-14433 > URL: https://issues.apache.org/jira/browse/FLINK-14433 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Fix For: 1.10.0 > > > Currently, we’ll generate a jaas file under tmp directory[1], files generated > the jobs which run on the same machine will all run into the same directory > /tmp, this may be problematic because of the reasons: > * Run out of inode for the disk which directory /tmp on > * The performance of /tmp directory will affect the read/write performance > of jaas file. > > So we propose to change place the jaas file under the > {{CoreOptions.TMP_DIRS}} directory other than the /tmp directory. > > [1] > https://github.com/apache/flink/blob/dbe1bfa31db4a561b6faa9c1235f02dc130825ca/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java#L143 -- This message was sent by Atlassian Jira (v8.3.4#803005)