[jira] [Created] (FLINK-17458) TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers

2020-04-29 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-17458:
-

 Summary: 
TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers
 Key: FLINK-17458
 URL: https://issues.apache.org/jira/browse/FLINK-17458
 Project: Flink
  Issue Type: Test
  Components: Tests
Affects Versions: 1.10.0
Reporter: Congxian Qiu(klion26)


When verifying the RC of release-1.10.1, found that 
`TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers` will fail 
because of Timeout sometime. 

I run this test locally in IDEA, found the following exception(locally in only 
encounter 2 in 1000 times)
{code:java}
java.lang.InterruptedExceptionjava.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
 at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:190) at 
akka.event.LoggingBus$class.akka$event$LoggingBus$$addLogger(Logging.scala:182) 
at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:117) 
at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:116) 
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at 
scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) 
at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:116) at 
akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:113) at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682) at 
akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:113) at 
akka.event.EventStream.startDefaultLoggers(EventStream.scala:22) at 
akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:662) at 
akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874) at 
akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870) at 
akka.actor.ActorSystemImpl._start(ActorSystem.scala:870) at 
akka.actor.ActorSystemImpl.start(ActorSystem.scala:891) at 
akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96) at 
akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70) at 
akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55) at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125) 
at 
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:113) 
at 
org.apache.flink.runtime.akka.AkkaUtils$.createLocalActorSystem(AkkaUtils.scala:68)
 at 
org.apache.flink.runtime.akka.AkkaUtils.createLocalActorSystem(AkkaUtils.scala) 
at 
org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:74)
 at 
org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:67)
 at 
org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment$Builder.build(TaskSubmissionTestEnvironment.java:349)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest.testFailingScheduleOrUpdateConsumers(TaskExecutorSubmissionTest.java:544)
 at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748)
org.junit.runners.model.TestTimedOutException: test timed out after 1 
milliseconds
 at sun.misc.Unsafe.park(Native Method) at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at 

[jira] [Commented] (FLINK-16636) TableEnvironmentITCase is crashing on Travis

2020-04-23 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17090518#comment-17090518
 ] 

Congxian Qiu(klion26) commented on FLINK-16636:
---

another instacne [https://travis-ci.org/github/klion26/flink/jobs/678526505]

> TableEnvironmentITCase is crashing on Travis
> 
>
> Key: FLINK-16636
> URL: https://issues.apache.org/jira/browse/FLINK-16636
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Jark Wu
>Assignee: Caizhi Weng
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Here is the instance and exception stack: 
> https://api.travis-ci.org/v3/job/663408376/log.txt
> But there is not too much helpful information there, maybe a accidental maven 
> problem.
> {code}
> 09:55:07.703 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test 
> (integration-tests) on project flink-table-planner-blink_2.11: There are test 
> failures.
> 09:55:07.703 [ERROR] 
> 09:55:07.703 [ERROR] Please refer to 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire-reports
>  for the individual test results.
> 09:55:07.703 [ERROR] Please refer to dump files (if any exist) [date].dump, 
> [date]-jvmRun[N].dump and [date].dumpstream.
> 09:55:07.703 [ERROR] ExecutionException The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 09:55:07.703 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target 
> && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire/surefirebooter714252487017838305.jar
>  
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire
>  2020-03-17T09-34-41_826-jvmRun1 surefire4625103637332937565tmp 
> surefire_43192129054983363633tmp
> 09:55:07.703 [ERROR] Error occurred in starting fork, check output in log
> 09:55:07.703 [ERROR] Process Exit Code: 137
> 09:55:07.703 [ERROR] Crashed tests:
> 09:55:07.703 [ERROR] org.apache.flink.table.api.TableEnvironmentITCase
> 09:55:07.703 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 09:55:07.703 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target 
> && /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
> -Dmvn.forkNumber=1 -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire/surefirebooter714252487017838305.jar
>  
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/target/surefire
>  2020-03-17T09-34-41_826-jvmRun1 surefire4625103637332937565tmp 
> surefire_43192129054983363633tmp
> 09:55:07.703 [ERROR] Error occurred in starting fork, check output in log
> 09:55:07.703 [ERROR] Process Exit Code: 137
> 09:55:07.703 [ERROR] Crashed tests:
> 09:55:07.703 [ERROR] org.apache.flink.table.api.TableEnvironmentITCase
> 09:55:07.703 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 09:55:07.704 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> 09:55:07.704 [ERROR] at 
> 

[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer

2020-04-15 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084492#comment-17084492
 ] 

Congxian Qiu(klion26) commented on FLINK-17170:
---

Hi, [~cvasii] from the description, seems the savepoint successfully, and 
"unfinished" task was blocked by something.

Currently, the lifetime of task logic is "trigger savepoint" -> "savepoint 
complete" -> "savepoint complete" -> "finish task"

>From the previous comments you given, seems the stack was waiting for some 
>lock, could you please check what is it waiting for?

or could you please share the whole jstack message about the "unfinished" task.

> Cannot stop streaming job with savepoint which uses kinesis consumer
> 
>
> Key: FLINK-17170
> URL: https://issues.apache.org/jira/browse/FLINK-17170
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kinesis
>Affects Versions: 1.10.0
>Reporter: Vasii Cosmin Radu
>Priority: Major
> Attachments: Screenshot 2020-04-15 at 18.16.26.png
>
>
> I am encountering a very strange situation where I can't stop with savepoint 
> a streaming job.
> The job reads from kinesis and sinks to S3, very simple job, no mapping 
> function, no watermarks, just source->sink. 
> Source is using flink-kinesis-consumer, sink is using StreamingFileSink. 
> Everything works fine, except stopping the job with savepoints.
> The behaviour happens only when multiple task managers are involved, having 
> sub-tasks off the job spread across multiple task manager instances. When a 
> single task manager has all the sub-tasks this issue never occurred.
> Using latest Flink 1.10.0 version, deployment done in HA mode (2 job 
> managers), in EC2, savepoints and checkpoints written on S3.
> When trying to stop, the savepoint is created correctly and appears on S3, 
> but not all sub-tasks are stopped. Some of them finished, but some just 
> remain hanged. Sometimes, on the same task manager part of the sub-tasks are 
> finished, part aren't.
> The logs don't show any errors. For the ones that succeed, the standard 
> messages appear, with "Source: <> switched from RUNNING to FINISHED".
> For the sub-tasks hanged the last message is 
> "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - 
> Shutting down the shard consumer threads of subtask 0 ..." and that's it.
>  
> I tried using the cli (flink stop )
> Timeout Message:
> {code:java}
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25Suspending job 
> "cf43cecd9339e8f02a12333e52966a25" with a savepoint. 
>  The program 
> finished with the following exception: org.apache.flink.util.FlinkException: 
> Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) 
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) 
> at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused 
> by: java.util.concurrent.TimeoutException at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) 
> ... 9 more{code}
>  
> Using the monitoring api, I keep getting infinite message when querying based 
> on the savepoint id, that the status id is still "IN_PROGRESS".
>  
> When performing a cancel instead of stop, it works. But cancel is deprecated, 
> so I am a bit concerned that this might fail also, maybe I was just lucky.
>  
> I attached a screenshot with what the UI is showing when this happens
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17043) Putting more information into accounting when failing a job in FailureManager

2020-04-07 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-17043:
-

 Summary: Putting more information into accounting when failing a 
job in FailureManager
 Key: FLINK-17043
 URL: https://issues.apache.org/jira/browse/FLINK-17043
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Congxian Qiu(klion26)


Currently, we only fail the job when we received continues 
"CHECKPOINT_DECLINED" message, but ignored the 
"timeout"/"task_failure"/"task_checkpoint_failure"/"finalize_checkpoint_failure"
 and so on.

In my opinion, we should put some checkpoint failure reason above into account 
when failing a job (not only the "CHECKPOINT_DECLINED" reason"


This issue is inspired by a [user mail 
list|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Making-job-fail-on-Checkpoint-Expired-tt34051.html]],

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17007) Add section "How to handle application parameters" in DataStream documentation

2020-04-06 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-17007:
-

 Summary: Add section "How to handle application parameters" in 
DataStream documentation
 Key: FLINK-17007
 URL: https://issues.apache.org/jira/browse/FLINK-17007
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Congxian Qiu(klion26)
 Fix For: 1.11.0


This issue wants to add a section “How to handle application parameters” in the 
DataStream page.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null

2020-04-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074991#comment-17074991
 ] 

Congxian Qiu(klion26) commented on FLINK-16724:
---

[~nppoly] What do you think about my comment, If you agree with it, could you 
please close this ticket?

PS: FLINK-16916 has been fixed, now you can try to use {{NullableSerialzier}} 
in the new master codebase.

> ListSerializer cannot serialize list which containers null
> --
>
> Key: FLINK-16724
> URL: https://issues.apache.org/jira/browse/FLINK-16724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Chongchen Chen
>Priority: Major
> Attachments: list_serializer_err.diff
>
>
> MapSerializer handles null value correctly, but ListSerializer doesn't. The 
> attachment is the modification of unit test that can reproduce the bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16908) FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while initializing transactional state in 60000ms.

2020-04-02 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17074174#comment-17074174
 ] 

Congxian Qiu(klion26) commented on FLINK-16908:
---

another instance 
[https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/6996/logs/90]

> FlinkKafkaProducerITCase testScaleUpAfterScalingDown Timeout expired while 
> initializing transactional state in 6ms.
> ---
>
> Key: FLINK-16908
> URL: https://issues.apache.org/jira/browse/FLINK-16908
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.11.0
>Reporter: Piotr Nowojski
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6889=logs=c5f0071e-1851-543e-9a45-9ac140befc32=f66652e3-384e-5b25-be29-abfea69ea8da
> {noformat}
> [ERROR] 
> testScaleUpAfterScalingDown(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>   Time elapsed: 64.353 s  <<< ERROR!
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> initializing transactional state in 6ms.
> {noformat}
> After this initial error many other tests (I think all following unit tests) 
> failed with errors like:
> {noformat}
> [ERROR] 
> testFailAndRecoverSameCheckpointTwice(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
>   Time elapsed: 7.895 s  <<< FAILURE!
> java.lang.AssertionError: Detected producer leak. Thread name: 
> kafka-producer-network-thread | producer-196
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:675)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testFailAndRecoverSameCheckpointTwice(FlinkKafkaProducerITCase.java:311)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16916) The logic of NullableSerializer#copy is wrong

2020-04-01 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-16916:
--
Fix Version/s: 1.10.2
   1.9.3
   1.8.4

> The logic of NullableSerializer#copy is wrong
> -
>
> Key: FLINK-16916
> URL: https://issues.apache.org/jira/browse/FLINK-16916
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
>Reporter: Congxian Qiu(klion26)
>Priority: Blocker
> Fix For: 1.8.4, 1.9.3, 1.11.0, 1.10.2
>
>
> When debugging the problem reported by FLINK-16724, Found that the logic of 
> {{NullableSerializer#copy}} is wrong. currently, the logic is such as below:
> {code:java}
> public void copy(DataInputView source, DataOutputView target) throws 
> IOException {
>boolean isNull = source.readBoolean();
>target.writeBoolean(isNull);
>if (isNull) {
>   target.write(padding);
>}
>else {
>   originalSerializer.copy(source, target);
>}
> }
> {code}
> we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length 
> is not 0.
> We can correct the logic such as below 
> {code:java}
> public void copy(DataInputView source, DataOutputView target) throws 
> IOException {
>boolean isNull = deserializeNull(source); // this will skip the padding 
> values.
>target.writeBoolean(isNull);
>if (isNull) {
>   target.write(padding);
>}
>else {
>   originalSerializer.copy(source, target);
>}
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16724) ListSerializer cannot serialize list which containers null

2020-04-01 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072596#comment-17072596
 ] 

Congxian Qiu(klion26) commented on FLINK-16724:
---

[~nppoly] thanks for report this issue, This is not the problem of 
{{ListSerialzier}}, the reason is that {{LongSerialzier}} does not support 
serializer null value. if you want to serializer null values, you can use 
{{NullableSerializer instead of }}{{LongSerializer}}.

Currently,  you will still find that the test will fail if you replace 
{{LongSerializer}} with {{NullableSerializer}}because the implementation of 
{{NullableSerializer}}#copy is not correct, I've created FLINK-16916 to track 
it.

> ListSerializer cannot serialize list which containers null
> --
>
> Key: FLINK-16724
> URL: https://issues.apache.org/jira/browse/FLINK-16724
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Chongchen Chen
>Priority: Major
> Attachments: list_serializer_err.diff
>
>
> MapSerializer handles null value correctly, but ListSerializer doesn't. The 
> attachment is the modification of unit test that can reproduce the bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16916) The logic of NullableSerializer#copy is wrong

2020-04-01 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072593#comment-17072593
 ] 

Congxian Qiu(klion26) commented on FLINK-16916:
---

[~aljoscha] , what do you think about this, please assign this to me if it is 
reasonable.
As this can lead to wrong answer when using this serializer, so mark it as 
BLOCKer, please downgrade if the priority set wrong.

> The logic of NullableSerializer#copy is wrong
> -
>
> Key: FLINK-16916
> URL: https://issues.apache.org/jira/browse/FLINK-16916
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.8.3, 1.9.2, 1.10.0
>Reporter: Congxian Qiu(klion26)
>Priority: Blocker
> Fix For: 1.11.0
>
>
> When debugging the problem reported by FLINK-16724, Found that the logic of 
> {{NullableSerializer#copy}} is wrong. currently, the logic is such as below:
> {code:java}
> public void copy(DataInputView source, DataOutputView target) throws 
> IOException {
>boolean isNull = source.readBoolean();
>target.writeBoolean(isNull);
>if (isNull) {
>   target.write(padding);
>}
>else {
>   originalSerializer.copy(source, target);
>}
> }
> {code}
> we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length 
> is not 0.
> We can correct the logic such as below 
> {code:java}
> public void copy(DataInputView source, DataOutputView target) throws 
> IOException {
>boolean isNull = deserializeNull(source); // this will skip the padding 
> values.
>target.writeBoolean(isNull);
>if (isNull) {
>   target.write(padding);
>}
>else {
>   originalSerializer.copy(source, target);
>}
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16916) The logic of NullableSerializer#copy is wrong

2020-04-01 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-16916:
-

 Summary: The logic of NullableSerializer#copy is wrong
 Key: FLINK-16916
 URL: https://issues.apache.org/jira/browse/FLINK-16916
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.10.0, 1.9.2, 1.8.3
Reporter: Congxian Qiu(klion26)
 Fix For: 1.11.0


When debugging the problem reported by FLINK-16724, Found that the logic of 
{{NullableSerializer#copy}} is wrong. currently, the logic is such as below:
{code:java}
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
   boolean isNull = source.readBoolean();
   target.writeBoolean(isNull);
   if (isNull) {
  target.write(padding);
   }
   else {
  originalSerializer.copy(source, target);
   }
}

{code}
we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length is 
not 0.

We can correct the logic such as below 
{code:java}
public void copy(DataInputView source, DataOutputView target) throws 
IOException {
   boolean isNull = deserializeNull(source); // this will skip the padding 
values.
   target.writeBoolean(isNull);
   if (isNull) {
  target.write(padding);
   }
   else {
  originalSerializer.copy(source, target);
   }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16576) State inconsistency on restore with memory state backends

2020-03-28 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17069428#comment-17069428
 ] 

Congxian Qiu(klion26) commented on FLINK-16576:
---

The reason why the restore failed here because of that {{the *mapping of 
stateId and metaInfo is wrong*}}.

The mapping is wrong because we registered some metaInfos that do not belong to 
current subtask. 
{code:java}
// HeapRestoreOperation#restore
createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById); // will 
register the metainfo

readStateHandleStateData(
   fsDataInputStream,
   inView,
   keyGroupsStateHandle.getGroupRangeOffsets(),
   kvStatesById, restoredMetaInfos.size(),
   serializationProxy.getReadVersion(),
   serializationProxy.isUsingKeyGroupCompression());


private void createOrCheckStateForMetaInfo(
   List restoredMetaInfo,
   Map kvStatesById) {

   for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
  final StateSnapshotRestore registeredState;

  ..

  if (registeredState == null) {
 kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); // 
constructing the mapping between stateId and metaInfo, even if the current 
statehandle does not belong to the current subtask
  }
   }
}
{code}
from the code above we can see, we'll always register the metainfo even if the 
current state handle does not belong to ourselves(the KeyGroupStateHandle will 
contain metaInfo, EMPTY_KEYGROUP, empty offsets and the stateHandle data). 
after the registered the wrong metainfo, then the *{{mapping of stateId and 
metaInfo becomes wrong(when constructing the mapping, we assume that all the 
handles belong to the current subtask).}}* {{(RocksDBStateBackend does not 
construct such mapping, so would not encounter such error).}}

{{For the solution here, I want to filter out the stateHandles out when 
assigning state to subtask in }}{{StateAssignmentOperation}}.{{ }}

> State inconsistency on restore with memory state backends
> -
>
> Key: FLINK-16576
> URL: https://issues.apache.org/jira/browse/FLINK-16576
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Nico Kruber
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>
> I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} 
> example in Flink. Restore would fail with either of these causes, but only 
> for the memory state backends and only with some combinations of parallelism 
> I took the savepoint with and parallelism I restore the job with:
> {code:java}
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, 
> endKeyGroup=95} does not contain key group 97 {code}
> or
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>  {code}
> or
> {code:java}
> java.io.IOException: Corrupt stream, found tag: 8
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
>  {code}
>  
> I managed to make it reproducible in a test that I quickly hacked together in 
> [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java]
>  (please checkout the whole repository since I had to change some 
> dependencies).
> In a bit more detail, this is what I discovered before, also with a manual 
> savepoint on S3:
> Savepoint that was taken with parallelism 2 (p=2) and shows the restore 
> failure in three different ways (all running in Flink 1.10.0; but I also see 
> it in Flink 1.9):
>  * first of all, if I try to restore with p=2, everything is fine
>  * if I restore with p=4 I get an exception like the one mentioned above:
> {code:java}
> 2020-03-11 15:53:35,149 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
> PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) 
> (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>   at 
> 

[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-02-11 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034562#comment-17034562
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

on {{ScalaOptionSerializerSnapshotMigrationTest}}. 

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-01-31 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027476#comment-17027476
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm on {{NullableSerializerMigrationTest}}, and 
{{VoidNamespacieSerializerSnapshotMigrationTest}}.

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-01-29 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17026454#comment-17026454
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm on {{BaseTypeSerializerSnapshotMigrationTest}}.

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-01-21 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020738#comment-17020738
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm on {{StreamElementSerializerMigrationTest}} now.

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-01-21 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020734#comment-17020734
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm on {{TimerSerializerSnapshotMigrationTest}} now.

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15699) FirstValueAggFunctionWithoutOrderTest failed on Traivs

2020-01-20 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15699:
-

 Summary: FirstValueAggFunctionWithoutOrderTest failed on Traivs
 Key: FLINK-15699
 URL: https://issues.apache.org/jira/browse/FLINK-15699
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.11.0
Reporter: Congxian Qiu(klion26)


09:48:53.473 [ERROR] COMPILATION ERROR : 
09:48:53.473 [INFO] 
-
09:48:53.473 [ERROR] 
/home/travis/build/flink-ci/flink/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueAggFunctionWithoutOrderTest.java:[78,37]
 incompatible types: inference variable T has incompatible bounds
 equality constraints: java.lang.Byte,T,T,T,T,T,java.lang.Boolean,T
 lower bounds: java.lang.Boolean
09:48:53.473 [INFO] 1 error

[https://travis-ci.com/flink-ci/flink/jobs/277466696]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15699) FirstValueAggFunctionWithoutOrderTest failed on Traivs

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019799#comment-17019799
 ] 

Congxian Qiu(klion26) commented on FLINK-15699:
---

cc [~godfreyhe]

> FirstValueAggFunctionWithoutOrderTest failed on Traivs
> --
>
> Key: FLINK-15699
> URL: https://issues.apache.org/jira/browse/FLINK-15699
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.11.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> 09:48:53.473 [ERROR] COMPILATION ERROR : 
> 09:48:53.473 [INFO] 
> -
> 09:48:53.473 [ERROR] 
> /home/travis/build/flink-ci/flink/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueAggFunctionWithoutOrderTest.java:[78,37]
>  incompatible types: inference variable T has incompatible bounds
>  equality constraints: java.lang.Byte,T,T,T,T,T,java.lang.Boolean,T
>  lower bounds: java.lang.Boolean
> 09:48:53.473 [INFO] 1 error
> [https://travis-ci.com/flink-ci/flink/jobs/277466696]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13632) Port serializer upgrade tests to TypeSerializerUpgradeTestBase

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019532#comment-17019532
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm porting {{PrimitiveArraySerializerSnapshotMigrationTest}}.

> Port serializer upgrade tests to TypeSerializerUpgradeTestBase
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Test
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019463#comment-17019463
 ] 

Congxian Qiu(klion26) commented on FLINK-15637:
---

[~gjy] Yes, I'm working on this, please assign it to me.

> For RocksDBStateBackend, make RocksDB the default store for timers
> --
>
> Key: FLINK-15637
> URL: https://issues.apache.org/jira/browse/FLINK-15637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by 
> default. Also ensure that the programmatic default value becomes the same.
> We need to update the performance tuning guide to mention this.
>  
> We need to update the release notes to mention this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-15685:
--
Affects Version/s: 1.11.0

> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
> ---
>
> Key: FLINK-15685
> URL: https://issues.apache.org/jira/browse/FLINK-15685
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Tests
>Affects Versions: 1.11.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
>  
> 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time 
> elapsed: 76.395 s <<< FAILURE! - in 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase
>  
> 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 
> kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
>  Time elapsed: 23.806 s <<< ERROR!
>  
> 18973java.io.IOException: 
>  
> 18974Process execution failed due error. Error output:Exception in thread 
> "main" org.apache.flink.table.client.SqlClientException: Unexpected 
> exception. This is a bug. Please consider filing an issue.
>  
> 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
>  
> 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Could not create execution context.
>  
> 18977 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
>  
> 18978 at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228)
>  
> 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98)
>  
> 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>  
> 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
>  
> 18982 at 
> org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64)
>  
> 18983 at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285)
>  
> 18984 at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163)
>  
> 18985 at 
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>  
> 18986 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371)
>  
> 18987 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552)
>  
> 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>  
> 18989 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550)
>  
> 18990 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487)
>  
> 18991 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
>  
> 18992 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
>  
> 18993 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748)
>  
> 18994 ... 3 more
>  
> 18995Caused by: java.lang.ClassNotFoundException: 
> org.apache.avro.io.DatumReader
>  
> 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>  
> 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  
> 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  
> 18999 ... 15 more
>  
> 19000
>  
> 19001 at 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178)
>  
> 19002 at 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151)
>  
> 19003
>  
> 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 
> kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
>  Time elapsed: 25.227 s <<< ERROR!
>  
> 19005java.io.IOException: 
>  
> 19006Process execution failed due error. Error output:Exception in thread 
> "main" org.apache.flink.table.client.SqlClientException: Unexpected 
> exception. This is a bug. Please consider filing an issue.
>  
> 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
>  
> 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Could not create execution context.
>  
> 19009 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
>  
> 19010 at 
> 

[jira] [Commented] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019365#comment-17019365
 ] 

Congxian Qiu(klion26) commented on FLINK-15685:
---

>From the exception log, it was thrown in {{SqlClinet.java}}, so I attached 
>this issue to {{Table SQL/Cline}} component. 

> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
> ---
>
> Key: FLINK-15685
> URL: https://issues.apache.org/jira/browse/FLINK-15685
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Tests
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
>  
> 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time 
> elapsed: 76.395 s <<< FAILURE! - in 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase
>  
> 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 
> kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
>  Time elapsed: 23.806 s <<< ERROR!
>  
> 18973java.io.IOException: 
>  
> 18974Process execution failed due error. Error output:Exception in thread 
> "main" org.apache.flink.table.client.SqlClientException: Unexpected 
> exception. This is a bug. Please consider filing an issue.
>  
> 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
>  
> 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Could not create execution context.
>  
> 18977 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
>  
> 18978 at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228)
>  
> 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98)
>  
> 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>  
> 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
>  
> 18982 at 
> org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64)
>  
> 18983 at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285)
>  
> 18984 at 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163)
>  
> 18985 at 
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
>  
> 18986 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371)
>  
> 18987 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552)
>  
> 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>  
> 18989 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550)
>  
> 18990 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487)
>  
> 18991 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
>  
> 18992 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
>  
> 18993 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748)
>  
> 18994 ... 3 more
>  
> 18995Caused by: java.lang.ClassNotFoundException: 
> org.apache.avro.io.DatumReader
>  
> 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>  
> 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  
> 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  
> 18999 ... 15 more
>  
> 19000
>  
> 19001 at 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178)
>  
> 19002 at 
> org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151)
>  
> 19003
>  
> 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 
> kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
>  Time elapsed: 25.227 s <<< ERROR!
>  
> 19005java.io.IOException: 
>  
> 19006Process execution failed due error. Error output:Exception in thread 
> "main" org.apache.flink.table.client.SqlClientException: Unexpected 
> exception. This is a bug. Please consider filing an issue.
>  
> 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
>  
> 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
> Could not create execution context.
>  
> 19009 at 
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
>  
> 

[jira] [Created] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs

2020-01-20 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15685:
-

 Summary: org.apache.flink.tests.util.kafka.SQLClientKafkaITCase 
failed on traivs
 Key: FLINK-15685
 URL: https://issues.apache.org/jira/browse/FLINK-15685
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client, Tests
Reporter: Congxian Qiu(klion26)


 
08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time 
elapsed: 76.395 s <<< FAILURE! - in 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase
 
1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 
kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
 Time elapsed: 23.806 s <<< ERROR!
 
18973java.io.IOException: 
 
18974Process execution failed due error. Error output:Exception in thread 
"main" org.apache.flink.table.client.SqlClientException: Unexpected exception. 
This is a bug. Please consider filing an issue.
 
18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
 
18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Could not create execution context.
 
18977 at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
 
18978 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228)
 
18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98)
 
18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
 
18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
 
18982 at 
org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64)
 
18983 at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285)
 
18984 at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163)
 
18985 at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
 
18986 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371)
 
18987 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552)
 
18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
 
18989 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550)
 
18990 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487)
 
18991 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159)
 
18992 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118)
 
18993 at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748)
 
18994 ... 3 more
 
18995Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumReader
 
18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 
18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 
18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 
18999 ... 15 more
 
19000
 
19001 at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178)
 
19002 at 
org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151)
 
19003
 
1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 
kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase)
 Time elapsed: 25.227 s <<< ERROR!
 
19005java.io.IOException: 
 
19006Process execution failed due error. Error output:Exception in thread 
"main" org.apache.flink.table.client.SqlClientException: Unexpected exception. 
This is a bug. Please consider filing an issue.
 
19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
 
19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Could not create execution context.
 
19009 at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759)
 
19010 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228)
 
19011 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98)
 
19012 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
 
19013Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader
 
19014 at 
org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64)
 
19015 at 

[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException

2020-01-20 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019341#comment-17019341
 ] 

Congxian Qiu(klion26) commented on FLINK-15247:
---

Seems another instance 
[https://travis-ci.org/klion26/flink/jobs/639304855?utm_medium=notification_source=github_status]
{code:java}
07:02:05.922 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 7.328 s <<< FAILURE! - in 
org.apache.flink.test.streaming.runtime.BackPressureITCase3910
07:02:05.922 [ERROR] 
operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
  Time elapsed: 7.319 s  <<< ERROR!3911org.apache.flink.util.FlinkException: 
Could not close resource.
3912at 
org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:166)
3913   Caused by: org.apache.flink.util.FlinkException: Error while 
shutting the TaskExecutor down.
3914   Caused by: org.apache.flink.util.FlinkException: Could not 
properly shut down the TaskManager services.
3915   Caused by: java.util.ConcurrentModificationException
{code}

> Closing (Testing)MiniCluster may cause ConcurrentModificationException
> --
>
> Key: FLINK-15247
> URL: https://issues.apache.org/jira/browse/FLINK-15247
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {noformat}
> Test 
> operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
>  failed with:
> org.apache.flink.util.FlinkException: Could not close resource.
> at 
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
> at 

[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers

2020-01-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 4:07 AM:


Ran the benchmark of 
{{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in 
flink-benchmarks repo}}, there will be some regression after applying this 
change. for no incremental mode, there is 4.6% regression, and 2.9% regression 
for incremental mode.
 * Result before applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 We may need to add a release note to let users know about this.


was (Author: klion26):
Ran the benchmark of 
{{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in 
flink-benchmarks repo}}, there will be some regression after applying this 
change. for no incremental mode, there is 4.6% regression, and 2.9% regression 
for incremental mode.
 * Result before applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 We may need to add a release note to let users know about this.

> For RocksDBStateBackend, make RocksDB the default store for timers
> --
>
> Key: FLINK-15637
> URL: https://issues.apache.org/jira/browse/FLINK-15637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by 
> default. Also ensure that the programmatic default value becomes the same.
> We need to update the performance tuning guide to mention this.
>  
> We need to update the release notes to mention this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers

2020-01-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 4:07 AM:


Ran the benchmark of 
{{[RocksDBStateBackend|[https://github.com/dataArtisans/flink-benchmarks]] in 
flink-benchmarks repo}}, there will be some regression after applying this 
change. for no incremental mode, there is 4.6% regression, and 2.9% regression 
for incremental mode.
 * Result before applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 We may need to add a release note to let users know about this.


was (Author: klion26):
Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression 
after applying this change. for no incremental mode, there is 4.6% regression, 
and 2.9% regression for incremental mode.
 * Result before applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 We may need to add a release note to let users know about this.

> For RocksDBStateBackend, make RocksDB the default store for timers
> --
>
> Key: FLINK-15637
> URL: https://issues.apache.org/jira/browse/FLINK-15637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by 
> default. Also ensure that the programmatic default value becomes the same.
> We need to update the performance tuning guide to mention this.
>  
> We need to update the release notes to mention this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers

2020-01-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15637 at 1/20/20 3:00 AM:


Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression 
after applying this change. for no incremental mode, there is 4.6% regression, 
and 2.9% regression for incremental mode.
 * Result before applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 We may need to add a release note to let users know about this.


was (Author: klion26):
Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression 
after applying this change. for no incremental mode, there is 4.6% regression, 
and 2.9% regression for incremental mode.
 * Result before applying the change

 
{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 

> For RocksDBStateBackend, make RocksDB the default store for timers
> --
>
> Key: FLINK-15637
> URL: https://issues.apache.org/jira/browse/FLINK-15637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by 
> default. Also ensure that the programmatic default value becomes the same.
> We need to update the performance tuning guide to mention this.
>  
> We need to update the release notes to mention this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15637) For RocksDBStateBackend, make RocksDB the default store for timers

2020-01-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019194#comment-17019194
 ] 

Congxian Qiu(klion26) commented on FLINK-15637:
---

Ran the benchmark of {{RocksDBStateBackend}}, there will be some regression 
after applying this change. for no incremental mode, there is 4.6% regression, 
and 2.9% regression for incremental mode.
 * Result before applying the change

 
{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units
RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  203.893 ± 
1.580  ops/ms
RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  201.896 ± 
5.179  ops/ms
{code}
 * Result after applying the change

{code:java}
Benchmark                                 (stateBackend)   Mode  Cnt    Score   
Error   Units RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   
30  194.382 ± 2.256  ops/ms RocksStateBackendBenchmark.stateBackends       
ROCKS_INC  thrpt   30  195.912 ± 2.151  ops/ms
{code}
Steps for generating the result
 # checkout the commit before applying the change and install
 # run the benchmark to get the result
 # checkout the commit after applying the change and install
 # run the benchmark to get the result.

 

> For RocksDBStateBackend, make RocksDB the default store for timers
> --
>
> Key: FLINK-15637
> URL: https://issues.apache.org/jira/browse/FLINK-15637
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Set the {{state.backend.rocksdb.timer-service.factory}} to {{ROCKSDB}} by 
> default. Also ensure that the programmatic default value becomes the same.
> We need to update the performance tuning guide to mention this.
>  
> We need to update the release notes to mention this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15661) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed because of Could not find Flink job

2020-01-19 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15661:
-

 Summary: 
JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed 
because of Could not find Flink job 
 Key: FLINK-15661
 URL: https://issues.apache.org/jira/browse/FLINK-15661
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.11.0
Reporter: Congxian Qiu(klion26)


2020-01-19T06:25:02.3856954Z [ERROR] 
JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:347 The 
program encountered a ExecutionException : 
org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (47fe3e8df0e59994938485f683d1410e)
 2020-01-19T06:25:02.3857171Z at 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:91)
 2020-01-19T06:25:02.3857571Z at 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:82)
 2020-01-19T06:25:02.3857866Z at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 2020-01-19T06:25:02.3857982Z at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 2020-01-19T06:25:02.3859852Z at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 2020-01-19T06:25:02.3860440Z at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 2020-01-19T06:25:02.3860732Z at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872)
 2020-01-19T06:25:02.3860960Z at 
akka.dispatch.OnComplete.internal(Future.scala:263)
 2020-01-19T06:25:02.3861099Z at 
akka.dispatch.OnComplete.internal(Future.scala:261)
 2020-01-19T06:25:02.3861232Z at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 2020-01-19T06:25:02.3861391Z at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
 2020-01-19T06:25:02.3861546Z at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 2020-01-19T06:25:02.3861712Z at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 2020-01-19T06:25:02.3861809Z at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 2020-01-19T06:25:02.3861916Z at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 2020-01-19T06:25:02.3862221Z at 
akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 2020-01-19T06:25:02.3862475Z at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
 2020-01-19T06:25:02.3862626Z at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 2020-01-19T06:25:02.3862736Z at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 2020-01-19T06:25:02.3862820Z at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 2020-01-19T06:25:02.3867146Z at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 2020-01-19T06:25:02.3867318Z at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 2020-01-19T06:25:02.3867441Z at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 2020-01-19T06:25:02.3867552Z at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 2020-01-19T06:25:02.3867664Z at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 2020-01-19T06:25:02.3867763Z at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 2020-01-19T06:25:02.3867843Z at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 2020-01-19T06:25:02.3867936Z at 
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 2020-01-19T06:25:02.3868036Z at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 2020-01-19T06:25:02.3868145Z at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 2020-01-19T06:25:02.3868223Z at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 2020-01-19T06:25:02.3868313Z at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 2020-01-19T06:25:02.3868390Z at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 2020-01-19T06:25:02.3868520Z Caused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (47fe3e8df0e59994938485f683d1410e)
 2020-01-19T06:25:02.3868625Z at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$17(Dispatcher.java:516)
 

[jira] [Commented] (FLINK-13632) Update serializer upgrade tests to restore from 1.8 and beyond

2020-01-17 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018075#comment-17018075
 ] 

Congxian Qiu(klion26) commented on FLINK-13632:
---

I'm currently porting {{ValueSerializerMigrationTest}}

> Update serializer upgrade tests to restore from 1.8 and beyond
> --
>
> Key: FLINK-13632
> URL: https://issues.apache.org/jira/browse/FLINK-13632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Blocker
> Fix For: 1.10.0
>
>
> FLINK-11767 introduced a new test base ({{TypeSerializerUpgradeTestBase}}) 
> for writing serializer upgrade tests. Now we need to migrate all existing 
> tests from {{TypeSerializerSnapshotMigrationTestBase}} to use the new test 
> base and update them to restore from Flink 1.8 onward.
> It seems these are the subclasses:
> * TtlSerializerStateMigrationTest (org.apache.flink.runtime.state.ttl)
> * ValueSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * PrimitiveArraySerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base.array)
> * AvroSerializerMigrationTest (org.apache.flink.formats.avro.typeutils)
> * TupleSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * BufferEntrySerializerMigrationTest 
> (org.apache.flink.streaming.api.operators.co)
> * TimerSerializerSnapshotMigrationTest 
> (org.apache.flink.streaming.api.operators)
> * StreamElementSerializerMigrationTest 
> (org.apache.flink.streaming.runtime.streamrecord)
> * KryoSnapshotMigrationTest (org.apache.flink.api.java.typeutils.runtime.kryo)
> * BaseTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * NullableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * VoidNamespacieSerializerSnapshotMigrationTest 
> (org.apache.flink.runtime.state)
> * ScalaOptionSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * ScalaTrySerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * JavaSerializerSnapshotMigrationTest (org.apache.flink.runtime.state)
> * LockableTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.cep.nfa.sharedbuffer)
> * NFASerializerSnapshotsMigrationTest (org.apache.flink.cep)
> * WindowSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.runtime.operators.windowing)
> * UnionSerializerMigrationTest (org.apache.flink.streaming.api.datastream)
> * TwoPhaseCommitSinkStateSerializerMigrationTest 
> (org.apache.flink.streaming.api.functions.sink)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * KafkaSerializerSnapshotsMigrationTest 
> (org.apache.flink.streaming.connectors.kafka)
> * RowSerializerMigrationTest (org.apache.flink.api.java.typeutils.runtime)
> * ValueArraySerializerSnapshotMigrationTest 
> (org.apache.flink.graph.types.valuearray)
> * MapSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * CompositeTypeSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils)
> * ListSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * EnumSerializerSnapshotMigrationTest 
> (org.apache.flink.api.common.typeutils.base)
> * PojoSerializerSnapshotMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ArrayListSerializerMigrationTest (org.apache.flink.runtime.state)
> * CopyableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * WritableSerializerMigrationTest 
> (org.apache.flink.api.java.typeutils.runtime)
> * ListViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * MapViewSerializerSnapshotMigrationTest (org.apache.flink.table.dataview)
> * ScalaEitherSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * LongValueWithProperHashCodeSerializerSnapshotMigrationTest 
> (org.apache.flink.graph.drivers.transform)
> * ScalaCaseClassSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * TraversableSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)
> * EnumValueSerializerSnapshotMigrationTest 
> (org.apache.flink.api.scala.typeutils)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15619) GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount failed on Azure

2020-01-16 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15619:
-

 Summary: 
GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount
 failed  on Azure
 Key: FLINK-15619
 URL: https://issues.apache.org/jira/browse/FLINK-15619
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.10.0
Reporter: Congxian Qiu(klion26)


01-16T08:32:11.0214825Z [ERROR] 
testAllProcessingTimeTumblingGroupWindowOverCount[StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase)
 Time elapsed: 2.213 s <<< ERROR! 2020-01-16T08:32:11.0223298Z 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
2020-01-16T08:32:11.0241857Z at 
org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount(GroupWindowTableAggregateITCase.scala:130)
 2020-01-16T08:32:11.0261909Z Caused by: org.apache.flink.runtime.JobException: 
Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
backoffTimeMS=0) 2020-01-16T08:32:11.0274347Z Caused by: java.lang.Exception: 
Artificial Failure 2020-01-16T08:32:11.0291664Z

 

[https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/4391/logs/16]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15603) Show "barrier lag" in checkpoint statistics

2020-01-15 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17016484#comment-17016484
 ] 

Congxian Qiu(klion26) commented on FLINK-15603:
---

And we may add some debug log in {{CheckpointBarrierAligner}} when received a 
barrier and when barrier align complete such as {{CheckpointBarrierTracker}}, 
this can be helpful when debugging checkpoint problem.

> Show "barrier lag" in checkpoint statistics
> ---
>
> Key: FLINK-15603
> URL: https://issues.apache.org/jira/browse/FLINK-15603
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Stephan Ewen
>Priority: Critical
>  Labels: usability
> Fix For: 1.11.0
>
>
> One of the most important metrics is missing in the checkpoint stats: 
> "barrier lag", meaning the time it between when the checkpoint was triggered 
> and when the barriers arrive at a task.
> That time is critical to identify if a checkpoint takes too long because of 
> backpressure or other contention.
> You can implicitly calculate this by "end_to_end_time - sync_time - 
> async_time", but it is much more obvious for users that something is up when 
> this number is explicitly shown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2020-01-10 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012786#comment-17012786
 ] 

Congxian Qiu(klion26) commented on FLINK-12785:
---

Already added the release not, closing this issue.

> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2020-01-10 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) closed FLINK-12785.
-
Resolution: Fixed

> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2020-01-10 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-12785:
--
Release Note: 
Before FLINK-12785, user may encounter OOM if there are huge KV pairs when 
restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce 
a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's 
WriteBatch will flush if the consumed memory exceeds it. User could tune the 
limit through the state.backend.rocksdb.write-batch-size property in 
flink-conf.yaml if needed.


  was:
Before FLINK-12785, user may encounter OOM if there are huge KV pairs when 
restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce 
a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's 
WriteBatch will flush if the consumed memory exceeds it. User could tune the 
limit through the state.backend.rocksdb.write-batch-size property in 
flink-conf.yaml if needed.
User can use `state.backend.rocksdb.write-batch-size` to change the size of 
WriteBatch if you needed. 


> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2020-01-10 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-12785:
--
Release Note: 
Before FLINK-12785, user may encounter OOM if there are huge KV pairs when 
restoring from savepoint of RocksDB state backend. In FLINK-12785 we introduce 
a size limit in RocksDBWriteBatchWrapper with default value 2MB, and RocksDB's 
WriteBatch will flush if the consumed memory exceeds it. User could tune the 
limit through the state.backend.rocksdb.write-batch-size property in 
flink-conf.yaml if needed.
User can use `state.backend.rocksdb.write-batch-size` to change the size of 
WriteBatch if you needed. 

> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15529) Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0

2020-01-10 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012748#comment-17012748
 ] 

Congxian Qiu(klion26) commented on FLINK-15529:
---

Currently. I've tested using Pojo serializer manually, with such scenarios, all 
are ok.
 * RocksDBStateBackend
 ** use original pojo class
 ** add/delete some fields in pojo class
 ** reorder fields in pojo class
 ** change field type
 * HeapStateBackend
 ** use original pojo class
 ** add/delete some fields in pojo class
 ** reorder fields in pojo class
 ** change field type

> Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0
> -
>
> Key: FLINK-15529
> URL: https://issues.apache.org/jira/browse/FLINK-15529
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Update upgrade compatibility table (docs/ops/upgrading.md) for 1.10.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15424) Make all AppendingState#add respect the java doc

2020-01-06 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17009446#comment-17009446
 ] 

Congxian Qiu(klion26) commented on FLINK-15424:
---

[~aljoscha] thanks for the reply.

{{AppendingState}} can be {{ListState}} or {{ValueState}}. I did some search 
for whether should collection allow null element, and found that
 * java.uitl.List allows null if the implementation allows null. {{ArrayList 
and }}{{LinkedList}} allow adding null element
 * {{ListState#add}} do not allow add null element, will throw an exception, 
{{ListState}} can be created through {{RuntimeContext}} by user.
 * Guava collection recommend avoiding to use null in collections[1] 

Summary of all the things, I'd like to propose, change the java doc to "do not 
allow null value", and make all the implementation of {{AppendingStat}}e 
respect this doc(throw Exception if add a null element), what do you think? 

[1][https://github.com/google/guava/wiki/UsingAndAvoidingNullExplained]

> Make all AppendingState#add respect the java doc
> 
>
> Key: FLINK-15424
> URL: https://issues.apache.org/jira/browse/FLINK-15424
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.3, 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Currently, We have a java doc in 
> {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}}
> {code:java}
>  If null is passed in, the state value will remain unchanged.{code}
> but currently, the implementation did not respect this, take 
> {{HeapReducingState}} as an example, we'll clear the state if the passed 
> parameter is null
> {code:java}
> @Override 
> public void add(V value) throws IOException {
> if (value == null) {  
> clear();  
> return;   
> }
> try { 
> stateTable.transform(currentNamespace, value, reduceTransformation);  
> } catch (Exception e) { 
> throw new IOException("Exception while applying ReduceFunction in 
> reducing state", e);
> } 
> }
> {code}
> But in {{RocksDBReducingState}}  we would not clear the state, and put the 
> null value into state if serializer can serialize null.
> {code:java}
> @Override
> public void add(V value) throws Exception {
>byte[] key = getKeyBytes();
>V oldValue = getInternal(key);
>V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, 
> value);
>updateInternal(key, newValue);
> }
> {code}
> this issue wants to make all {{Appending}}State respect the javadoc of 
> {{AppendingState}}, and return directly if the passed in parameter is null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2020-01-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007378#comment-17007378
 ] 

Congxian Qiu(klion26) commented on FLINK-15152:
---

[~pnowojski] thanks for the confirmation, I can help to fix this, please assign 
it to me.

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   

[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2020-01-02 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006650#comment-17006650
 ] 

Congxian Qiu(klion26) commented on FLINK-15152:
---

[~pnowojski] thanks for your reply.

Yes, duplicate a logic from 
{{org.apache.flink.runtime.scheduler.SchedulerBase#triggerSavepoint}}.

I think we should {{restart}} the {{CheckpointCoordinator}} in two cases below:
 # {{CheckpointCoordinator#triggerSynchronousSavepoint}} failed
 # stop job failed(even if the synchronous savepoint succeed)

 

For the new issues/extra complexity that might be introduced. IMO, we should 
make the {{CheckpointCoordinator}} running if the job is not stopped. so I 
think {{restart}} CheckpointCoordinator is needed.

So I proposed the previous change.

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }

[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException

2020-01-01 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006575#comment-17006575
 ] 

Congxian Qiu(klion26) commented on FLINK-15247:
---

Seems another instance [https://travis-ci.com/flink-ci/flink/jobs/271415113]

> Closing (Testing)MiniCluster may cause ConcurrentModificationException
> --
>
> Key: FLINK-15247
> URL: https://issues.apache.org/jira/browse/FLINK-15247
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {noformat}
> Test 
> operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
>  failed with:
> org.apache.flink.util.FlinkException: Could not close resource.
> at 
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: org.apache.flink.util.FlinkException: Error while shutting the 
> TaskExecutor down.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397)
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382)
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at 
> 

[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-01 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006540#comment-17006540
 ] 

Congxian Qiu(klion26) commented on FLINK-15447:
---

FYI, the Jaas files will locate in {{/tmp}} directory, if you have too many 
jobs run in the same machine, maybe you'll encounter this exception,  the jaas 
file has been moved to {{WORKING_DIR}} in FLINK-14433

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on azure

2019-12-31 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-15451:
--
Summary: 
TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure 
failed on azure  (was: 
TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure)

> TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure 
> failed on azure
> --
>
> Key: FLINK-15451
> URL: https://issues.apache.org/jira/browse/FLINK-15451
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in 
> org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase 
> 2019-12-31T02:43:39.4768373Z [ERROR] 
> testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase)
>  Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z 
> java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z
>  
>  
> [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure

2019-12-31 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15451:
-

 Summary: 
TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure
 Key: FLINK-15451
 URL: https://issues.apache.org/jira/browse/FLINK-15451
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.9.1
Reporter: Congxian Qiu(klion26)


2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in 
org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase 
2019-12-31T02:43:39.4768373Z [ERROR] 
testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase)
 Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z 
java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z

 

 

[https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15247) Closing (Testing)MiniCluster may cause ConcurrentModificationException

2019-12-30 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005943#comment-17005943
 ] 

Congxian Qiu(klion26) commented on FLINK-15247:
---

another instance [https://travis-ci.com/flink-ci/flink/jobs/271335452]

> Closing (Testing)MiniCluster may cause ConcurrentModificationException
> --
>
> Key: FLINK-15247
> URL: https://issues.apache.org/jira/browse/FLINK-15247
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Gary Yao
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {noformat}
> Test 
> operatorsBecomeBackPressured(org.apache.flink.test.streaming.runtime.BackPressureITCase)
>  failed with:
> org.apache.flink.util.FlinkException: Could not close resource.
> at 
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)org.apache.flink.test.streaming.runtime.BackPressureITCase.tearDown(BackPressureITCase.java:165)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runners.Suite.runChild(Suite.java:128)
> at org.junit.runners.Suite.runChild(Suite.java:27)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
> at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> Caused by: org.apache.flink.util.FlinkException: Error while shutting the 
> TaskExecutor down.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:397)
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$0(TaskExecutor.java:382)
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at 
> 

[jira] [Commented] (FLINK-15427) State TTL RocksDb backend end-to-end test stalls on travis

2019-12-28 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004461#comment-17004461
 ] 

Congxian Qiu(klion26) commented on FLINK-15427:
---

This test itself passed, but failed when {{checking exceptions}}
{code:java}
Checking for errors...^M
Found error in log files:^M
{code}
after executing the command to find the exception we'll get
{code:java}
2019-12-27 05:18:34,743 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Received 
CancelTaskException while we are not canceled. This is a bug and should be 
reported^M
org.apache.flink.runtime.execution.CancelTaskException: Consumed partition 
PipelinedSubpartitionView(index: 2) of ResultPartition 
745fd76b3c0327b1b0732bb14045de1c@2e06db5ab07dfc5dabc32576a9a40a0f has been 
released.^M
at 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190)^M
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509)^M
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487)^M
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475)^M
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75)^M
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125)^M
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)^M
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)^M
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)^M
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)^M
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)^M
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)^M
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)^M

{code}
This {{CancelException}} was introduced by [~AHeise] in FLINK-15317, and I 
think FLINK-15403 has already been tracking it.

For this issue, it wants to verify state ttl, if the verify failed, it will 
print something in the stdout. we used the {{check exceptions(which will check 
whether the `.out` file is empty or not)}} to do this thing.

So, I propose to check in the {{test_stream_state__ttl}}.sh other than 
{{delegating to the exceptions check}}, the reason is something like 
FLINK-15105.

we'll add some checking logic in the end of {{test_stream_state_ttl.sh}}  such 
as below and skip exception check for this test. [~azagrebin]
{code:java}
if grep "TtlVerificationContext{" $FLINK_DIR/log/*.out > /dev/null; then
   exit 1; # contains the output
fi
{code}

> State TTL RocksDb backend end-to-end test stalls on travis
> --
>
> Key: FLINK-15427
> URL: https://issues.apache.org/jira/browse/FLINK-15427
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The 'State TTL RocksDb backend end-to-end test' case stalls and finally 
> timedout with error message:
> {noformat}
> The job exceeded the maximum log length, and has been terminated.
> {noformat}
> https://api.travis-ci.org/v3/job/629699416/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15427) State TTL RocksDb backend end-to-end test stalls on travis

2019-12-27 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004128#comment-17004128
 ] 

Congxian Qiu(klion26) commented on FLINK-15427:
---

I'll  take a look at this issue

> State TTL RocksDb backend end-to-end test stalls on travis
> --
>
> Key: FLINK-15427
> URL: https://issues.apache.org/jira/browse/FLINK-15427
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The 'State TTL RocksDb backend end-to-end test' case stalls and finally 
> timedout with error message:
> {noformat}
> The job exceeded the maximum log length, and has been terminated.
> {noformat}
> https://api.travis-ci.org/v3/job/629699416/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15424) Make all AppendingState#add respect the java doc

2019-12-27 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-15424:
--
Component/s: Runtime / State Backends

> Make all AppendingState#add respect the java doc
> 
>
> Key: FLINK-15424
> URL: https://issues.apache.org/jira/browse/FLINK-15424
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.3, 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Currently, We have a java doc in 
> {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}}
> {code:java}
>  If null is passed in, the state value will remain unchanged.{code}
> but currently, the implementation did not respect this, take 
> {{HeapReducingState}} as an example, we'll clear the state if the passed 
> parameter is null
> {code:java}
> @Override 
> public void add(V value) throws IOException {
> if (value == null) {  
> clear();  
> return;   
> }
> try { 
> stateTable.transform(currentNamespace, value, reduceTransformation);  
> } catch (Exception e) { 
> throw new IOException("Exception while applying ReduceFunction in 
> reducing state", e);
> } 
> }
> {code}
> But in {{RocksDBReducingState}}  we would not clear the state, and put the 
> null value into state if serializer can serialize null.
> {code:java}
> @Override
> public void add(V value) throws Exception {
>byte[] key = getKeyBytes();
>V oldValue = getInternal(key);
>V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, 
> value);
>updateInternal(key, newValue);
> }
> {code}
> this issue wants to make all {{Appending}}State respect the javadoc of 
> {{AppendingState}}, and return directly if the passed in parameter is null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15424) Make all AppendingState#add respect the java doc

2019-12-27 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15424:
-

 Summary: Make all AppendingState#add respect the java doc
 Key: FLINK-15424
 URL: https://issues.apache.org/jira/browse/FLINK-15424
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.1, 1.8.3
Reporter: Congxian Qiu(klion26)


Currently, We have a java doc in 
{{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}}
{code:java}
 If null is passed in, the state value will remain unchanged.{code}
but currently, the implementation did not respect this, take 
{{HeapReducingState}} as an example, we'll clear the state if the passed 
parameter is null
{code:java}
@Override   
public void add(V value) throws IOException {
if (value == null) {
clear();
return; 
}
try {   
stateTable.transform(currentNamespace, value, reduceTransformation);
} catch (Exception e) { 
throw new IOException("Exception while applying ReduceFunction in 
reducing state", e);  
}   
}
{code}
But in {{RocksDBReducingState}}  we would not clear the state, and put the null 
value into state if serializer can serialize null.
{code:java}
@Override
public void add(V value) throws Exception {
   byte[] key = getKeyBytes();
   V oldValue = getInternal(key);
   V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, 
value);
   updateInternal(key, newValue);
}
{code}
this issue wants to make all {{Appending}}State respect the javadoc of 
{{AppendingState}}, and return directly if the passed in parameter is null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15403) 'State Migration end-to-end test from 1.6' is unstable on travis.

2019-12-26 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003840#comment-17003840
 ] 

Congxian Qiu(klion26) commented on FLINK-15403:
---

The exception in the description is an error log, I'm not sure whether the test 
failed by it (maybe by excpetionCheck?) This log was introduced by [~AHeise] in 
FLINK-15317, maybe he can share more about it.

> 'State Migration end-to-end test from 1.6' is unstable on travis.
> -
>
> Key: FLINK-15403
> URL: https://issues.apache.org/jira/browse/FLINK-15403
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: Xintong Song
>Priority: Critical
>  Labels: testability
> Fix For: 1.10.0
>
>
> https://api.travis-ci.org/v3/job/629576631/log.txt
> The test case fails because the log contains the following error message.
> {code}
> 2019-12-26 09:19:35,537 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Received 
> CancelTaskException while we are not canceled. This is a bug and should be 
> reported
> org.apache.flink.runtime.execution.CancelTaskException: Consumed partition 
> PipelinedSubpartitionView(index: 0) of ResultPartition 
> 3886657fb8cc980139fac67e32d6e380@8cfcbe851fe3bb3fa00e9afc370bd963 has been 
> released.
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:190)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:509)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:487)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:475)
>   at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:125)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15406) The savepoint is writted by "State Processor API" can't be restore by map or flatmap

2019-12-26 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003837#comment-17003837
 ] 

Congxian Qiu(klion26) commented on FLINK-15406:
---

Hi, [~lintingbin]  From the description, seems it throw exception when 
snapshotting instead of restoring.  and the root cause is {{keySerializer}} 
used to constructing {{InternalTimersSnapshot}} is NULL, maybe this operator 
did not call InternalTimerServiceIMple#{{startTimerSerivice}} ?

Could you please share how do you reproduce this problem? (a minimal demo can 
reproduce is better) thanks.

> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap
> 
>
> Key: FLINK-15406
> URL: https://issues.apache.org/jira/browse/FLINK-15406
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.9.1
>Reporter: Darcy Lin
>Priority: Major
>
> The savepoint is writted by "State Processor API" can't be restore by map or 
> flatmap. But it can be retored by KeyedProcessFunction.  
>  Following is the error message:
> {code:java}
> java.lang.Exception: Could not write timer service of Flat Map -> Map -> 
> Sink: device_first_user_create (1/8) to checkpoint state 
> stream.java.lang.Exception: Could not write timer service of Flat Map -> Map 
> -> Sink: device_first_user_create (1/8) to checkpoint state stream. at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>  at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.NullPointerException at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at 
> org.apache.flink.streaming.api.operators.InternalTimersSnapshot.(InternalTimersSnapshot.java:52)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.snapshotTimersForKeyGroup(InternalTimerServiceImpl.java:291)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:98)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462)
>  ... 19 more{code}
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-25 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003427#comment-17003427
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15152 at 12/26/19 2:27 AM:
-

 After having a look at the code, I think we should re-start 
{{checkpointCoordinator}} if {{triggerSynchronousSavepoint}} or 
{{terminateJob}} failed.  we can add a handler to re-start 
{{checheckpointCoordinator}} such as the below, what do you think?  
[~pnowojski] [~kkloudas]

CC [~zhuzh]
{code:java}
// end of SchedulerBase#stopWithSavepoint
savepointFuture.thenCompose((path) ->
   terminationFuture.thenApply(jobStatus -> path))
   .handle(
   (path, throwable) -> {
   if (throwable != null) {
  //re-start the checkpoint coordinator when triggerSynchronousSavepoint or 
terminateJob failed.
checkpointCoordinator.startCheckpointScheduler();
  throw new CompletionException(ExceptionUtils.stripException(throwable, 
CompletionException.class));
   }
   return path;
});{code}
 [1] 
[SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859]

 


was (Author: klion26):
 After having a look at the code, I think we should re-start 
checkpointCoordinator if {{triggerSynchronousSavepoint}} or {{terminateJob}} 
failed.  we can add a handler to re-start {{checheckpointCoordinator}} such as 
the below, what do you think?  [~pnowojski] [~kkloudas]

CC [~zhuzh]

 
{code:java}
// end of SchedulerBase#stopWithSavepoint
savepointFuture.thenCompose((path) ->
   terminationFuture.thenApply(jobStatus -> path))
   .handle(
   (path, throwable) -> {
   if (throwable != null) {
  throw new CompletionException(ExceptionUtils.stripException(throwable, 
CompletionException.class));
   }
   return path;
});{code}
 

[1] 
[SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859]

 

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> 

[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-25 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003427#comment-17003427
 ] 

Congxian Qiu(klion26) commented on FLINK-15152:
---

 After having a look at the code, I think we should re-start 
checkpointCoordinator if {{triggerSynchronousSavepoint}} or {{terminateJob}} 
failed.  we can add a handler to re-start {{checheckpointCoordinator}} such as 
the below, what do you think?  [~pnowojski] [~kkloudas]

CC [~zhuzh]

 
{code:java}
// end of SchedulerBase#stopWithSavepoint
savepointFuture.thenCompose((path) ->
   terminationFuture.thenApply(jobStatus -> path))
   .handle(
   (path, throwable) -> {
   if (throwable != null) {
  throw new CompletionException(ExceptionUtils.stripException(throwable, 
CompletionException.class));
   }
   return path;
});{code}
 

[1] 
[SchedulerBase#stopWithSavepoint|https://github.com/apache/flink/blob/0ba4a2b4cc2b48886d5a7948d631ea7da0068a0e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L859]

 

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = 

[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-25 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003193#comment-17003193
 ] 

Congxian Qiu(klion26) commented on FLINK-15152:
---

Thanks for reporting the issue, from the description, seems it is a bug, I'll 
take a look at it.

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   

[jira] [Commented] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis

2019-12-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999881#comment-16999881
 ] 

Congxian Qiu(klion26) commented on FLINK-13553:
---

Currently, can't reproduce it and not appear in the last month, close it now. 
please reopen it if it appears in the future.

> KvStateServerHandlerTest.readInboundBlocking unstable on Travis
> ---
>
> Key: FLINK-13553
> URL: https://issues.apache.org/jira/browse/FLINK-13553
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Queryable State
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{KvStateServerHandlerTest.readInboundBlocking}} and 
> {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a 
> {{TimeoutException}}.
> https://api.travis-ci.org/v3/job/566420641/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis

2019-12-19 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) closed FLINK-13553.
-
Resolution: Cannot Reproduce

> KvStateServerHandlerTest.readInboundBlocking unstable on Travis
> ---
>
> Key: FLINK-13553
> URL: https://issues.apache.org/jira/browse/FLINK-13553
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Queryable State
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{KvStateServerHandlerTest.readInboundBlocking}} and 
> {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a 
> {{TimeoutException}}.
> https://api.travis-ci.org/v3/job/566420641/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15280) Checkpoint end-to-end test failed

2019-12-16 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997279#comment-16997279
 ] 

Congxian Qiu(klion26) commented on FLINK-15280:
---

FYI FLINK-15105 wants to disable the exception check logic.

If the problem here is the exception check, then I think it can benefit after 
FLINK-15105 resolved

> Checkpoint end-to-end test failed
> -
>
> Key: FLINK-15280
> URL: https://issues.apache.org/jira/browse/FLINK-15280
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: wangxiyuan
>Priority: Major
>
> The Checkpoint e2e test failed. The error log is very long. Please check the 
> link to find the error message.
> Some logs in these two days:
> [https://api.travis-ci.org/v3/job/624983426/log.txt]
> Running 'Resuming Externalized Checkpoint (rocks, incremental, no parallelism 
> change) end-to-end test'
>  
> [https://api.travis-ci.org/v3/job/625281873/log.txt]
> Running 'Resuming Externalized Checkpoint (file, sync, scale down) end-to-end 
> test'
>  
> And arm as well:
> [https://logs.openlabtesting.org/logs/periodic-20-flink-mail/github.com/apache/flink/master/flink-end-to-end-test-arm64-checkpoints-and-sticky/3da575a/job-output.txt.gz]
> Running 'Resuming Externalized Checkpoint (rocks, incremental, no parallelism 
> change) end-to-end test'
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13808) Checkpoints expired by timeout may leak RocksDB files

2019-12-12 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995375#comment-16995375
 ] 

Congxian Qiu(klion26) commented on FLINK-13808:
---

[~sewen] FYI, create an issue to track this FLINK-15236

> Checkpoints expired by timeout may leak RocksDB files
> -
>
> Key: FLINK-13808
> URL: https://issues.apache.org/jira/browse/FLINK-13808
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.8.0, 1.8.1
> Environment: So far only reliably reproducible on a 4-node cluster 
> with parallelism ≥ 100. But do try 
> https://github.com/jcaesar/flink-rocksdb-file-leak
>Reporter: Julius Michaelis
>Priority: Minor
>
> A RocksDB state backend with HDFS checkpoints, with or without local 
> recovery, may leak files in {{io.tmp.dirs}} on checkpoint expiry by timeout.
> If the size of a checkpoint crosses what can be transferred during one 
> checkpoint timeout, checkpoints will continue to fail forever. If this is 
> combined with a quick rollover of SST files (e.g. due to a high density of 
> writes), this may quickly exhaust available disk space (or memory, as /tmp is 
> the default location).
> As a workaround, the jobmanager's REST API can be frequently queried for 
> failed checkpoints, and associated files deleted accordingly.
> I've tried investing the cause a little bit, but I'm stuck:
>  * {{Checkpoint 19 of job ac7efce3457d9d73b0a4f775a6ef46f8 expired before 
> completing.}} and similar gets printed, so
>  * [{{abortExpired}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L547-L549],
>  so
>  * [{{dispose}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L416],
>  so
>  * [{{cancelCaller}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L488],
>  so
>  * [the canceler is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L497]
>  ([through one more 
> layer|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L129]),
>  so
>  * [{{cleanup}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L95],
>  (possibly [not from 
> {{cancel}}|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L84]),
>  so
>  * [{{cleanupProvidedResources}} is 
> invoked|https://github.com/apache/flink/blob/release-1.8.1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncSnapshotCallable.java#L162]
>  (this is the indirection that made me give up), so
>  * [this trace 
> log|https://github.com/apache/flink/blob/release-1.8.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L372]
>  should be printed, but it isn't.
> I have some time to further investigate, but I'd appreciate help on finding 
> out where in this chain things go wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15236) Add a safety net to limit the number of concurrent checkpoints on TM side

2019-12-12 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-15236:
--
Summary: Add a safety net to limit the number of concurrent checkpoints on 
TM side  (was: Add a safety net for concurrent checkpoints on TM side)

> Add a safety net to limit the number of concurrent checkpoints on TM side
> -
>
> Key: FLINK-15236
> URL: https://issues.apache.org/jira/browse/FLINK-15236
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> As discussed in FLINK-13808,  we can add additional config 
> {{taskmanager.checkpoints.max-concurrent}} that limits the number of 
> concurrent checkpoints on the TM for safety net.
> this configure {{taskmanager.checkpoints.max-concurrent}}, and the default 
> value for maxConcurrentCheckpoints=1 is 1 and unlimited for 
> maxConcurrentCheckpoints > 1.
>  * If maxConcurrentCheckpoints = 1, the default 
> {{taskmanager.checkpoints.max-concurrent}} is 1.
>  * If maxConcurrentCheckpoints > 1 the default value for 
> {{taskmanager.checkpoints.max-concurrent}}, is unlimited 
> should not take manually triggered checkpoints/savepoints into account.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15236) Add a safety net for concurrent checkpoints on TM side

2019-12-12 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15236:
-

 Summary: Add a safety net for concurrent checkpoints on TM side
 Key: FLINK-15236
 URL: https://issues.apache.org/jira/browse/FLINK-15236
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Congxian Qiu(klion26)


As discussed in FLINK-13808,  we can add additional config 
{{taskmanager.checkpoints.max-concurrent}} that limits the number of concurrent 
checkpoints on the TM for safety net.

this configure {{taskmanager.checkpoints.max-concurrent}}, and the default 
value for maxConcurrentCheckpoints=1 is 1 and unlimited for 
maxConcurrentCheckpoints > 1.
 * If maxConcurrentCheckpoints = 1, the default 
{{taskmanager.checkpoints.max-concurrent}} is 1.
 * If maxConcurrentCheckpoints > 1 the default value for 
{{taskmanager.checkpoints.max-concurrent}}, is unlimited 

should not take manually triggered checkpoints/savepoints into account.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-12 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16994704#comment-16994704
 ] 

Congxian Qiu(klion26) commented on FLINK-15105:
---

Agree that {{disable the exception check}} here is the easiest way to fix this 
issue, and we don't need to touch any existing code.  Then we'll disable 
exception check for all test running 
{{test_resume_externalized_checkpoints.sh}}. I can help to fix this, could 
someone help to assign this ticket to me?

> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test stalls on travis
> -
>
> Key: FLINK-15105
> URL: https://issues.apache.org/jira/browse/FLINK-15105
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test fails on release-1.9 nightly build stalls with "The job 
> exceeded the maximum log length, and has been terminated".
> https://api.travis-ci.org/v3/job/621090394/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-11 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/11/19 1:52 PM:
-

First, answer the last question: we can't just remove "error" message in 
{{RuntimeException}},  we'll fail in 
{{common.sh#}}{{check_logs_for_exceptions()}} because of the 
{{RuntimeException}}.

Then I'll try to describe more about the things about {{FailureMapper}}.
 # {{FailureMapper is only used in {{DataStreamAllroundTestProgram.
 # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we 
[enabled 
TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}}
 in {{DataStreamAllroundTestProgram}}
 # {{We'll throw Exception in {{FailureMapper#map and 
{{FailureMapper#notifyCheckpointComplete}}
 # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, 
{{test_ha_per_job_cluster_datastream.sh}} and 
{{test_resume_externalized_checkpoints.sh}}

IIUC, all the above tests are wanna test whether the job can restore 
from(restore with checkpoint) the last failed job successfully(but we do not 
care where the exception come from, then Exception thrown from 
FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same 
effect). If we want to verify that `failure of notifyCheckpointComplete can 
fail task`, maybe we can add a ut for it.

 

 


was (Author: klion26):
First, answer the last question: we can't just remove "error" message in 
{{RuntimeException}},  we'll fail in 
{{common.sh#}}{{check_logs_for_exceptions()}} because of the 
{{RuntimeException}}.

Then I'll try to describe more about the things about {{FailureMapper}}.
 # {{FailureMapper is only used in {{DataStreamAllroundTestProgram.
 # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we 
[enabled 
TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}}
 in {{DataStreamAllroundTestProgram}}
 # {{We'll throw Exception in {{FailureMapper#map and 
{{FailureMapper#notifyCheckpointComplete}}
 # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, 
{{test_ha_per_job_cluster_datastream.sh}} and 
{{test_resume_externalized_checkpoints.sh}}

IIUC, all the above tests are wanna test whether the job can restore 
from(restore with checkpoint) the last failed job successfully(but we do not 
care where the exception come from, then Exception thrown from 
FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same 
effect, please correct me if I miss anything here). If we want to verify that 
`failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for 
it.

 

 

> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test stalls on travis
> -
>
> Key: FLINK-15105
> URL: https://issues.apache.org/jira/browse/FLINK-15105
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test fails on release-1.9 nightly build stalls with "The job 
> exceeded the maximum log length, and has been terminated".
> https://api.travis-ci.org/v3/job/621090394/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-11 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/11/19 1:46 PM:
-

First, answer the last question: we can't just remove "error" message in 
{{RuntimeException}},  we'll fail in 
{{common.sh#}}{{check_logs_for_exceptions()}} because of the 
{{RuntimeException}}.

Then I'll try to describe more about the things about {{FailureMapper}}.
 # {{FailureMapper is only used in {{DataStreamAllroundTestProgram.
 # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only if we 
[enabled 
TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]}}
 in {{DataStreamAllroundTestProgram}}
 # {{We'll throw Exception in {{FailureMapper#map and 
{{FailureMapper#notifyCheckpointComplete}}
 # {{we'll enable {{TEST_SIMULATE_FAILURE}} }} in {{test_ha_datastream.sh}}, 
{{test_ha_per_job_cluster_datastream.sh}} and 
{{test_resume_externalized_checkpoints.sh}}

IIUC, all the above tests are wanna test whether the job can restore 
from(restore with checkpoint) the last failed job successfully(but we do not 
care where the exception come from, then Exception thrown from 
FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same 
effect, please correct me if I miss anything here). If we want to verify that 
`failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for 
it.

 

 


was (Author: klion26):
First, answer the last question: we can't just remove "error" message in 
{{RuntimeException}},  we'll fail in 
{{common.sh#}}{{check_logs_for_exceptions()}} because of the 
{{RuntimeException}}.

Then I'll try to describe more about the things about {{FailureMapper}}.
 # {{FailureMapper is only used in {{DataStreamAllroundTestProgram.
 # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only }}if 
we [enabled 
TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]{{}}
 in {{DataStreamAllroundTestProgram}}
 # {{We'll throw Exception in }}{{FailureMapper#map}} and 
{{FailureMapper#notifyCheckpointComplete}}
 # {{we'll enable }}{{TEST_SIMULATE_FAILURE}} in {{test_ha_datastream.sh}}, 
{{test_ha_per_job_cluster_datastream.sh}} and 
{{test_resume_externalized_checkpoints.sh}}

IIUC, all the above tests are wanna test whether the job can restore 
from(restore with checkpoint) the last failed job successfully(but we do not 
care where the exception come from, then Exception thrown from 
FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same 
effect, please correct me if I miss anything here). If we want to verify that 
`failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for 
it.


 

 

> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test stalls on travis
> -
>
> Key: FLINK-15105
> URL: https://issues.apache.org/jira/browse/FLINK-15105
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test fails on release-1.9 nightly build stalls with "The job 
> exceeded the maximum log length, and has been terminated".
> https://api.travis-ci.org/v3/job/621090394/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-11 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993554#comment-16993554
 ] 

Congxian Qiu(klion26) commented on FLINK-15105:
---

First, answer the last question: we can't just remove "error" message in 
{{RuntimeException}},  we'll fail in 
{{common.sh#}}{{check_logs_for_exceptions()}} because of the 
{{RuntimeException}}.

Then I'll try to describe more about the things about {{FailureMapper}}.
 # {{FailureMapper is only used in {{DataStreamAllroundTestProgram.
 # we'll add a {{FailureMapper}} in {{DataStreamAllroundTestProgram only }}if 
we [enabled 
TEST_SIMULATE_FAILURE|https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java#L173]{{}}
 in {{DataStreamAllroundTestProgram}}
 # {{We'll throw Exception in }}{{FailureMapper#map}} and 
{{FailureMapper#notifyCheckpointComplete}}
 # {{we'll enable }}{{TEST_SIMULATE_FAILURE}} in {{test_ha_datastream.sh}}, 
{{test_ha_per_job_cluster_datastream.sh}} and 
{{test_resume_externalized_checkpoints.sh}}

IIUC, all the above tests are wanna test whether the job can restore 
from(restore with checkpoint) the last failed job successfully(but we do not 
care where the exception come from, then Exception thrown from 
FailureMapper#mapper or FailureMapper#notifyCheckpointComplete have the same 
effect, please correct me if I miss anything here). If we want to verify that 
`failure of notifyCheckpointComplete can fail task`, maybe we can add a ut for 
it.


 

 

> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test stalls on travis
> -
>
> Key: FLINK-15105
> URL: https://issues.apache.org/jira/browse/FLINK-15105
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test fails on release-1.9 nightly build stalls with "The job 
> exceeded the maximum log length, and has been terminated".
> https://api.travis-ci.org/v3/job/621090394/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-10 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/10/19 4:09 PM:
-

[~trohrmann]  In the previous comment, I didn't want to remove the whole 
{{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing 
statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment 
in the below code block.}}
{code:java}
public T map(T value) throws Exception {
   numProcessedRecords++;

   if (isReachedFailureThreshold()) {
  throw new Exception("Artificial failure.");
   }

   return value;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   numCompleteCheckpoints++;

   if (isReachedFailureThreshold()) { // === just want to remove 
this ===
  throw new Exception("Artificial failure.");
   }
}
{code}
I think the problem here is that we throw Artifical failure when completing 
checkpoint

After throwing {{Artificial failure}} in 
{{FailureMapper#notifyCheckpointComplete}}

---> we got the following exception(attached below)

---> test failed when {{check_logs_for_errors}} using the commands in 
{{common.sh}}.
{code:java}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more

{code}
Remove the {{Artificial failure throwing}} in 
{{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical 
failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial 
failure throwing}} is just needed when the source is finite, but in the test 
job, we use 
[SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102],
 it is an infinite source.


was (Author: klion26):
[~trohrmann]  In the previous comment, I'm not want to remove the whole 
{{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing 
statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment 
in the below code block.}}
{code:java}
public T map(T value) throws Exception {
   numProcessedRecords++;

   if (isReachedFailureThreshold()) {
  throw new Exception("Artificial failure.");
   }

   return value;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   numCompleteCheckpoints++;

   if (isReachedFailureThreshold()) { // === just want to remove 
this ===
  throw new Exception("Artificial failure.");
   }
}
{code}
I think the problem here is that we throw Artifical failure when completing 
checkpoint

After throwing {{Artificial failure}} in 
{{FailureMapper#notifyCheckpointComplete}}

---> we got the following exception(attached below)

---> test failed when {{check_logs_for_errors}} using the commands in 
{{common.sh}}.
{code:java}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at 

[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-10 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/10/19 3:49 PM:
-

[~trohrmann]  In the previous comment, I'm not want to remove the whole 
{{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing 
statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment 
in the below code block.}}
{code:java}
public T map(T value) throws Exception {
   numProcessedRecords++;

   if (isReachedFailureThreshold()) {
  throw new Exception("Artificial failure.");
   }

   return value;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   numCompleteCheckpoints++;

   if (isReachedFailureThreshold()) { // === just want to remove 
this ===
  throw new Exception("Artificial failure.");
   }
}
{code}
I think the problem here is that we throw Artifical failure when completing 
checkpoint

After throwing {{Artificial failure}} in 
{{FailureMapper#notifyCheckpointComplete}}

---> we got the following exception(attached below)

---> test failed when {{check_logs_for_errors}} using the commands in 
{{common.sh}}.
{code:java}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more

{code}
Remove the {{Artificial failure throwing}} in 
{{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical 
failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial 
failure throwing}} is just needed when the source is finite, but in the test 
job, we use 
[SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102],
 it is an infinite source.


was (Author: klion26):
[~trohrmann]  In the previous comment, I'm not want to remove the whole 
{{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing 
statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment 
in the below code block.}}

I think the problem here is that we throw Artifical failure when completing 
checkpoint(we'll throw Artifical failure in two places  in {{FailureMapper}})
{code:java}
public T map(T value) throws Exception {
   numProcessedRecords++;

   if (isReachedFailureThreshold()) {
  throw new Exception("Artificial failure.");
   }

   return value;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   numCompleteCheckpoints++;

   if (isReachedFailureThreshold()) { // === just want to remove 
this ===
  throw new Exception("Artificial failure.");
   }
}
{code}
After throwing {{Artificial failure}} in 
{{FailureMapper#notifyCheckpointComplete}}

---> we got the following exception(attached below)

---> test failed when {{check_logs_for_errors}} using the commands in 
{{common.sh}}.
{code:java}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at 

[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-10 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992673#comment-16992673
 ] 

Congxian Qiu(klion26) commented on FLINK-15105:
---

[~trohrmann]  In the previous comment, I'm not want to remove the whole 
{{FailureMapper}}, but just want to remove the {{Artificial failure}} throwing 
statement in {{FailureMapper}}#{{notifyCheckpointComplete just as the comment 
in the below code block.}}

I think the problem here is that we throw Artifical failure when completing 
checkpoint(we'll throw Artifical failure in two places  in {{FailureMapper}})
{code:java}
public T map(T value) throws Exception {
   numProcessedRecords++;

   if (isReachedFailureThreshold()) {
  throw new Exception("Artificial failure.");
   }

   return value;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
   numCompleteCheckpoints++;

   if (isReachedFailureThreshold()) { // === just want to remove 
this ===
  throw new Exception("Artificial failure.");
   }
}
{code}
After throwing {{Artificial failure}} in 
{{FailureMapper#notifyCheckpointComplete}}

---> we got the following exception(attached below)

---> test failed when {{check_logs_for_errors}} using the commands in 
{{common.sh}}.
{code:java}
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more

{code}
Remove the {{Artificial failure throwing}} in 
{{FailureMapper#notifyCheckpointComplete, we can still throw }}{{Aritifical 
failure}} in {{FailureMapper#notifyCheckpointComplete}}, IMHO, the {{Artificial 
failure throwing}} is just needed when the source is finite, but in the test 
job, we use 
[SequenceGeneratorSource|https://github.com/apache/flink/blob/171020749f7fccfa7781563569e2c88ea5e8b6a1/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java#L102],
 it is an infinite source.

> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test stalls on travis
> -
>
> Key: FLINK-15105
> URL: https://issues.apache.org/jira/browse/FLINK-15105
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> Resuming Externalized Checkpoint after terminal failure (rocks, incremental) 
> end-to-end test fails on release-1.9 nightly build stalls with "The job 
> exceeded the maximum log length, and has been terminated".
> https://api.travis-ci.org/v3/job/621090394/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15150) ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed on Travis

2019-12-09 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15150:
-

 Summary: 
ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed 
on Travis
 Key: FLINK-15150
 URL: https://issues.apache.org/jira/browse/FLINK-15150
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.10.0
Reporter: Congxian Qiu(klion26)


 
06:37:18.423 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 14.014 s <<< FAILURE! - in 
org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase
375406:37:18.423 [ERROR] 
testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase)
 Time elapsed: 14.001 s <<< ERROR!
3755java.util.concurrent.ExecutionException: 
org.apache.flink.util.FlinkException: JobMaster has been shut down.
3756 at 
org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.lambda$testJobExecutionOnClusterWithLeaderChange$1(ZooKeeperLeaderElectionITCase.java:131)
3757 at 
org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:131)
3758Caused by: org.apache.flink.util.FlinkException: JobMaster has been shut 
down.
3759
 
[https://travis-ci.com/flink-ci/flink/jobs/264972218]
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15097) flink can not use user specified hdfs conf when submitting app in client node

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991192#comment-16991192
 ] 

Congxian Qiu(klion26) commented on FLINK-15097:
---

If these two are truly the same, could we close this issue as Duplicated and 
linked to FLINK-11135?

> flink can not use user specified hdfs conf when submitting app in client node
> -
>
> Key: FLINK-15097
> URL: https://issues.apache.org/jira/browse/FLINK-15097
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.9.1
>Reporter: qian wang
>Priority: Major
> Attachments: 0001-adjust-read-hdfs-conf-order.patch
>
>
> now if cluster node had set env HADOOP_CONF_DIR,flink would force use the 
> hdfs-site.xml in the corresponding dir, then user who submitted app in the 
> client node couldn't use custom specified hdfs-site.xml/hdfs-default through 
> setting fs.hdfs.hdfssite or fs.hdfs.hdfsdefault so as to set custom blocksize 
> or replication num. For example Using yarnship to upload my hdfs conf dir and 
> set fs.hdfs.hdfssite direct to \{conf dir}/hdfs-site.xml is useless
> Deep in code it is due to the order of choosing conf in HadoopUtils.java,the 
> conf in HADOOP_CONF_DIR will override user's uploaded conf, i think the way 
> is  not sensible, so i reverse the order which flink read hdfs conf in order 
> to let user custom conf uploaded override HADOOP_CONF_DIR



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991179#comment-16991179
 ] 

Congxian Qiu(klion26) edited comment on FLINK-15105 at 12/9/19 6:38 AM:


The test complete checkpoint successfully in the first job, and resumed from 
the checkpoint successfully in the second job, and can complete checkpoint in 
the seconde job successfully, 
{code:java}
// log for first job
2019-12-05 20:12:17,410 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
SlidingWindowOperator (1/2) (5a5c73dd041a0145bc02dc017e46bf1f) switched from DE
PLOYING to RUNNING.
2019-12-05 20:12:17,970 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 1 @ 1575576737956 for job 39a292088648857cac5f7e110547c18
0.
2019-12-05 20:12:21,095 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 1 for job 39a292088648857cac5f7e110547c180 (261564 bytes in 3114 ms).
2019-12-05 20:12:21,113 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 2 @ 1575576741094 for job 39a292088648857cac5f7e110547c180.
2019-12-05 20:12:22,002 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - FailureMapper 
(1/1) (4d273da136346ef3ff6e1a54d197f00b) switched from RUNNING to
 FAILED.
java.lang.RuntimeException: Error while confirming checkpoint
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
        at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
        ... 5 more
2019-12-05 20:12:22,014 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding 
checkpoint 2 of job 39a292088648857cac5f7e110547c180.
java.lang.RuntimeException: Error while confirming checkpoint
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
        at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
        ... 5 more

// log for second job
2019-12-05 20:12:27,190 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 
7c862506012fb04c0d565bfda7cc9595 from savepoint 
file:///home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-02075534631/externalized-chckpt-e2e-backend-dir/39a292088648857cac5f7e110547c180/chk-1
 ()
2019-12-05 20:12:27,213 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the 
checkpoint ID of job 7c862506012fb04c0d565bfda7cc9595 to 2.
2019-12-05 20:12:27,220 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 
7c862506012fb04c0d565bfda7cc9595 from latest valid checkpoint: Checkpoint 1 @ 0 
for 7c862506012fb04c0d565bfda7cc9595.
2019-12-05 20:12:27,231 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state 
to restore
2019-12-05 20:12:27,232 INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager 
runner for job General purpose test job (7c862506012fb04c0d565bfda7cc9595) was 
granted leadership with session id ---- at 
akka.tcp://flink@localhost:6123/user/jobmanager_1.
2019-12-05 20:12:27,233 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
            - Starting execution of job General purpose 

[jira] [Commented] (FLINK-15105) Resuming Externalized Checkpoint after terminal failure (rocks, incremental) end-to-end test stalls on travis

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991179#comment-16991179
 ] 

Congxian Qiu(klion26) commented on FLINK-15105:
---

The test complete checkpoint successfully in the first job, and resumed from 
the checkpoint successfully in the second job, and can complete checkpoint in 
the seconde job successfully, 

 
{code:java}
// log for first job
2019-12-05 20:12:17,410 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
SlidingWindowOperator (1/2) (5a5c73dd041a0145bc02dc017e46bf1f) switched from DE
PLOYING to RUNNING.
2019-12-05 20:12:17,970 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 1 @ 1575576737956 for job 39a292088648857cac5f7e110547c18
0.
2019-12-05 20:12:21,095 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 1 for job 39a292088648857cac5f7e110547c180 (261564 bytes in 3114 ms).
2019-12-05 20:12:21,113 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 2 @ 1575576741094 for job 39a292088648857cac5f7e110547c180.
2019-12-05 20:12:22,002 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - FailureMapper 
(1/1) (4d273da136346ef3ff6e1a54d197f00b) switched from RUNNING to
 FAILED.
java.lang.RuntimeException: Error while confirming checkpoint
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
        at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
        ... 5 more
2019-12-05 20:12:22,014 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding 
checkpoint 2 of job 39a292088648857cac5f7e110547c180.
java.lang.RuntimeException: Error while confirming checkpoint
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Artificial failure.
        at 
org.apache.flink.streaming.tests.FailureMapper.notifyCheckpointComplete(FailureMapper.java:70)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
        at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
        ... 5 more

// log for second job
2019-12-05 20:12:27,190 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 
7c862506012fb04c0d565bfda7cc9595 from savepoint 
file:///home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-02075534631/externalized-chckpt-e2e-backend-dir/39a292088648857cac5f7e110547c180/chk-1
 ()
2019-12-05 20:12:27,213 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the 
checkpoint ID of job 7c862506012fb04c0d565bfda7cc9595 to 2.
2019-12-05 20:12:27,220 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 
7c862506012fb04c0d565bfda7cc9595 from latest valid checkpoint: Checkpoint 1 @ 0 
for 7c862506012fb04c0d565bfda7cc9595.
2019-12-05 20:12:27,231 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state 
to restore
2019-12-05 20:12:27,232 INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager 
runner for job General purpose test job (7c862506012fb04c0d565bfda7cc9595) was 
granted leadership with session id ---- at 
akka.tcp://flink@localhost:6123/user/jobmanager_1.
2019-12-05 20:12:27,233 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
            - Starting execution of job General purpose test job 
(7c862506012fb04c0d565bfda7cc9595) 

[jira] [Commented] (FLINK-14898) Enable background cleanup of state with TTL by default

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991103#comment-16991103
 ] 

Congxian Qiu(klion26) commented on FLINK-14898:
---

[~azagrebin] have created FLINK-15136 for this

> Enable background cleanup of state with TTL by default
> --
>
> Key: FLINK-14898
> URL: https://issues.apache.org/jira/browse/FLINK-14898
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> So far, we were conservative about enabling background cleanup strategies for 
> state with TTL. In general, if state is configured to have TTL, most users 
> would expect the background cleanup to kick in. As there were no reported 
> issues so far since the release of backend specific cleanups and that should 
> not affect any state without TTL, this issue suggests to enable default 
> background cleanup for backends.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15136) Update the Chinese version of "Working with state"

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991070#comment-16991070
 ] 

Congxian Qiu(klion26) commented on FLINK-15136:
---

As I translated the original "Working with State" page, I'm glad to fix this. 
[~jark] could you please assign this ticket to me
cc [~azagrebin]

> Update the Chinese version of "Working with state"
> --
>
> Key: FLINK-15136
> URL: https://issues.apache.org/jira/browse/FLINK-15136
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Currently, we enabled background cleanup of state with TTL by default in 
> FLINK-14898, and we should update the Chinese version to respect it.
>  
> documentation location : docs/dev/stream/state/state.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15136) Update the Chinese version of "Working with state"

2019-12-08 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-15136:
--
Affects Version/s: 1.10.0

> Update the Chinese version of "Working with state"
> --
>
> Key: FLINK-15136
> URL: https://issues.apache.org/jira/browse/FLINK-15136
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.10.0
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Currently, we enabled background cleanup of state with TTL by default in 
> FLINK-14898, and we should update the Chinese version to respect it.
>  
> documentation location : docs/dev/stream/state/state.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15136) Update the Chinese version of "Working with state"

2019-12-08 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-15136:
-

 Summary: Update the Chinese version of "Working with state"
 Key: FLINK-15136
 URL: https://issues.apache.org/jira/browse/FLINK-15136
 Project: Flink
  Issue Type: Sub-task
  Components: chinese-translation, Documentation
Reporter: Congxian Qiu(klion26)


Currently, we enabled background cleanup of state with TTL by default in 
FLINK-14898, and we should update the Chinese version to respect it.

 

documentation location : docs/dev/stream/state/state.zh.md



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11937) Resolve small file problem in RocksDB incremental checkpoint

2019-12-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986861#comment-16986861
 ] 

Congxian Qiu(klion26) commented on FLINK-11937:
---

Retarget to 1.11.0 to give more time for review and testing. There is an early 
version 
[here|https://github.com/klion26/flink/tree/small_files_with_pluggable_sharedregistry]
 and please refer to it if anyone meets this same problem and would like a 
quick fix. Thanks.
 

> Resolve small file problem in RocksDB incremental checkpoint
> 
>
> Key: FLINK-11937
> URL: https://issues.apache.org/jira/browse/FLINK-11937
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently when incremental checkpoint is enabled in RocksDBStateBackend a 
> separate file will be generated on DFS for each sst file. This may cause 
> “file flood” when running intensive workload (many jobs with high 
> parallelism) in big cluster. According to our observation in Alibaba 
> production, such file flood introduces at lease two drawbacks when using HDFS 
> as the checkpoint storage FileSystem: 1) huge number of RPC request issued to 
> NN which may burst its response queue; 2) huge number of files causes big 
> pressure on NN’s on-heap memory.
> In Flink we ever noticed similar small file flood problem and tried to 
> resolved it by introducing ByteStreamStateHandle(FLINK-2808), but this 
> solution has its limitation that if we configure the threshold too low there 
> will still be too many small files, while if too high the JM will finally 
> OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend 
> with incremental snapshot strategy.
> We propose a new OutputStream called 
> FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS 
> will reuse the same underlying distributed file until its size exceeds a 
> preset threshold. We
> plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly 
> resolve the specific storage amplification issue on FSCSOS, and lastly add an 
> option to reuse FSCSOS across multiple checkpoints to further reduce the DFS 
> file number.
> More details please refer to the attached design doc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13553) KvStateServerHandlerTest.readInboundBlocking unstable on Travis

2019-12-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986855#comment-16986855
 ] 

Congxian Qiu(klion26) commented on FLINK-13553:
---

I'll take a look at this issue, currently can't reproduce this locally, will 
try to run on Travis

> KvStateServerHandlerTest.readInboundBlocking unstable on Travis
> ---
>
> Key: FLINK-13553
> URL: https://issues.apache.org/jira/browse/FLINK-13553
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Queryable State
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> The {{KvStateServerHandlerTest.readInboundBlocking}} and 
> {{KvStateServerHandlerTest.testQueryExecutorShutDown}} fail on Travis with a 
> {{TimeoutException}}.
> https://api.travis-ci.org/v3/job/566420641/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13861) No new checkpoint will be trigged when canceling an expired checkpoint failed

2019-12-03 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986763#comment-16986763
 ] 

Congxian Qiu(klion26) commented on FLINK-13861:
---

[~pnowojski] [~SleePy] thanks for your reply. and sorry for my late reply(I 
thought that I have replied after the off-line discuss with [~SleePy])

Yes, I'm interested in the root cause also, but we did not encounter it 
anymore. After the off-line discuss with [~SleePy], we reach an agreement that 
ignores the exception directly may not be a good solution, ignoring the 
exception directly need we take attention to more and more corner cases, maybe 
fail the whole job when encountering such a case is more reasonable.

As the 1.10 will code freeze soon and I have some other work on hands now, so I 
just remove the fix version temporarily. and will come back soon :)

> No new checkpoint will be trigged when canceling an expired checkpoint failed
> -
>
> Key: FLINK-13861
> URL: https://issues.apache.org/jira/browse/FLINK-13861
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> I encountered this problem in our private fork of Flink, after taking a look 
> at the current master branch of Apache Flink, I think the problem exists here 
> also.
> Problem Detail:
>  1. checkpoint canceled because of expiration, so will call the canceller 
> such as below
> {code:java}
> final Runnable canceller = () -> {
>synchronized (lock) {
>   // only do the work if the checkpoint is not discarded anyways
>   // note that checkpoint completion discards the pending checkpoint 
> object
>   if (!checkpoint.isDiscarded()) {
>  LOG.info("Checkpoint {} of job {} expired before completing.", 
> checkpointID, job);
>  failPendingCheckpoint(checkpoint, 
> CheckpointFailureReason.CHECKPOINT_EXPIRED);
>  pendingCheckpoints.remove(checkpointID);
>  rememberRecentCheckpointId(checkpointID);
>  triggerQueuedRequests();
>   }
>}
> };{code}
>  
>  But failPendingCheckpoint may throw exceptions because it will call
> {{CheckpointCoordinator#failPendingCheckpoint}}
> -> {{PendingCheckpoint#abort}}
> ->  {{PendingCheckpoint#reportFailedCheckpoint}}
> -> initialize a FailedCheckpointStates,  may throw an exception by 
> {{checkArgument}} 
> Did not find more about why there ever failed the {{checkArgument 
> currently(this problem did not reproduce frequently)}}, will create an issue 
> for that if I have more findings.
>  
> 2. when trigger checkpoint next, we'll first check if there already are too 
> many checkpoints such as below
> {code:java}
> private void checkConcurrentCheckpoints() throws CheckpointException {
>if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
>   triggerRequestQueued = true;
>   if (currentPeriodicTrigger != null) {
>  currentPeriodicTrigger.cancel(false);
>  currentPeriodicTrigger = null;
>   }
>   throw new 
> CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
>}
> }
> {code}
> the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will 
> always true
> 3. no checkpoint will be triggered ever from that on.
>  Because of the {{failPendingCheckpoint}} may throw Exception, so we may 
> place the remove pending checkpoint logic in a finally block.
> I'd like to file a pr for this if this really needs to fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-13861) No new checkpoint will be trigged when canceling an expired checkpoint failed

2019-12-03 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13861:
--
Fix Version/s: (was: 1.10.0)

> No new checkpoint will be trigged when canceling an expired checkpoint failed
> -
>
> Key: FLINK-13861
> URL: https://issues.apache.org/jira/browse/FLINK-13861
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.2, 1.8.1, 1.9.0
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>
> I encountered this problem in our private fork of Flink, after taking a look 
> at the current master branch of Apache Flink, I think the problem exists here 
> also.
> Problem Detail:
>  1. checkpoint canceled because of expiration, so will call the canceller 
> such as below
> {code:java}
> final Runnable canceller = () -> {
>synchronized (lock) {
>   // only do the work if the checkpoint is not discarded anyways
>   // note that checkpoint completion discards the pending checkpoint 
> object
>   if (!checkpoint.isDiscarded()) {
>  LOG.info("Checkpoint {} of job {} expired before completing.", 
> checkpointID, job);
>  failPendingCheckpoint(checkpoint, 
> CheckpointFailureReason.CHECKPOINT_EXPIRED);
>  pendingCheckpoints.remove(checkpointID);
>  rememberRecentCheckpointId(checkpointID);
>  triggerQueuedRequests();
>   }
>}
> };{code}
>  
>  But failPendingCheckpoint may throw exceptions because it will call
> {{CheckpointCoordinator#failPendingCheckpoint}}
> -> {{PendingCheckpoint#abort}}
> ->  {{PendingCheckpoint#reportFailedCheckpoint}}
> -> initialize a FailedCheckpointStates,  may throw an exception by 
> {{checkArgument}} 
> Did not find more about why there ever failed the {{checkArgument 
> currently(this problem did not reproduce frequently)}}, will create an issue 
> for that if I have more findings.
>  
> 2. when trigger checkpoint next, we'll first check if there already are too 
> many checkpoints such as below
> {code:java}
> private void checkConcurrentCheckpoints() throws CheckpointException {
>if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
>   triggerRequestQueued = true;
>   if (currentPeriodicTrigger != null) {
>  currentPeriodicTrigger.cancel(false);
>  currentPeriodicTrigger = null;
>   }
>   throw new 
> CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
>}
> }
> {code}
> the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will 
> always true
> 3. no checkpoint will be triggered ever from that on.
>  Because of the {{failPendingCheckpoint}} may throw Exception, so we may 
> place the remove pending checkpoint logic in a finally block.
> I'd like to file a pr for this if this really needs to fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-11937) Resolve small file problem in RocksDB incremental checkpoint

2019-12-03 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-11937:
--
Fix Version/s: (was: 1.10.0)
   1.11.0

> Resolve small file problem in RocksDB incremental checkpoint
> 
>
> Key: FLINK-11937
> URL: https://issues.apache.org/jira/browse/FLINK-11937
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently when incremental checkpoint is enabled in RocksDBStateBackend a 
> separate file will be generated on DFS for each sst file. This may cause 
> “file flood” when running intensive workload (many jobs with high 
> parallelism) in big cluster. According to our observation in Alibaba 
> production, such file flood introduces at lease two drawbacks when using HDFS 
> as the checkpoint storage FileSystem: 1) huge number of RPC request issued to 
> NN which may burst its response queue; 2) huge number of files causes big 
> pressure on NN’s on-heap memory.
> In Flink we ever noticed similar small file flood problem and tried to 
> resolved it by introducing ByteStreamStateHandle(FLINK-2808), but this 
> solution has its limitation that if we configure the threshold too low there 
> will still be too many small files, while if too high the JM will finally 
> OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend 
> with incremental snapshot strategy.
> We propose a new OutputStream called 
> FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS 
> will reuse the same underlying distributed file until its size exceeds a 
> preset threshold. We
> plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly 
> resolve the specific storage amplification issue on FSCSOS, and lastly add an 
> option to reuse FSCSOS across multiple checkpoints to further reduce the DFS 
> file number.
> More details please refer to the attached design doc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14966) one error in document

2019-11-27 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983411#comment-16983411
 ] 

Congxian Qiu(klion26) edited comment on FLINK-14966 at 11/27/19 11:12 AM:
--

I think this is a typo as you described. could you please update the issue 
title so that others can know the problem from the title. (where the problem 
is, what the problem is)


was (Author: klion26):
I think this is a typo as you described. could you please update the issue 
title so that others can know the problem from the title.

> one error in document 
> --
>
> Key: FLINK-14966
> URL: https://issues.apache.org/jira/browse/FLINK-14966
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.9.0
>Reporter: yuzhemin
>Priority: Trivial
>
> Document url is 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html]
> error location: _Application Development > Basic API Concepts>Supported Data 
> Types>Values_
> I guess the location  _org.apache.flinktypes.Value and 
> org.apache.flinktypes.CopyableValue_ _referred in the paragraph means 
> org.apache.flink.types.XXX_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14966) one error in document

2019-11-27 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983411#comment-16983411
 ] 

Congxian Qiu(klion26) commented on FLINK-14966:
---

I think this is a typo as you described. could you please update the issue 
title so that others can know the problem from the title.

> one error in document 
> --
>
> Key: FLINK-14966
> URL: https://issues.apache.org/jira/browse/FLINK-14966
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.9.0
>Reporter: yuzhemin
>Priority: Trivial
>
> Document url is 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html]
> error location: _Application Development > Basic API Concepts>Supported Data 
> Types>Values_
> I guess the location  _org.apache.flinktypes.Value and 
> org.apache.flinktypes.CopyableValue_ _referred in the paragraph means 
> org.apache.flink.types.XXX_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14110) Deleting state.backend.rocksdb.localdir causes silent failure

2019-11-22 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980708#comment-16980708
 ] 

Congxian Qiu(klion26) commented on FLINK-14110:
---

[~liyu] thanks for the reminder, will take a look at this issue.

> Deleting state.backend.rocksdb.localdir causes silent failure
> -
>
> Key: FLINK-14110
> URL: https://issues.apache.org/jira/browse/FLINK-14110
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.1, 1.9.0
> Environment: Flink {{1.8.1}} and {{1.9.0}}.
> JVM 8
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose {{state.backend.rocksdb.localdir}} is configured as:
> {noformat}
> state.backend.rocksdb.localdir: /flink/tmp
> {noformat}
> If I then run \{{rm -rf /flink/tmp/job_*}} on a host while a Flink 
> application is running, I will observe the following:
>  * throughput of my operators running on that host will drop to zero
>  * the application will not fail or restart
>  * the task manager will not fail or restart
>  * in most cases there is nothing in the logs to indicate a failure (I've run 
> this several times and only once seen an exception - I believe I was lucky 
> and deleted those directories during a checkpoint or something)
> The desired behaviour here would be to throw an exception and crash, instead 
> of silently dropping throughput to zero. Restarting the Task Manager will 
> resolve the issues.
> I only tried this on Flink {{1.8.1}} and {{1.9.0}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14892) Add documentation for checkpoint directory layout

2019-11-22 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980652#comment-16980652
 ] 

Congxian Qiu(klion26) commented on FLINK-14892:
---

[~pnowojski] thanks for the reply and assigning, as checkpoint 
directory(contrast to savepoint) is Flink internal concept, will add this 
information when filing a pr.

> Add documentation for checkpoint directory layout
> -
>
> Key: FLINK-14892
> URL: https://issues.apache.org/jira/browse/FLINK-14892
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> In FLINK-8531, we change the checkpoint directory layout to
> {code:java}
> /user-defined-checkpoint-dir
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
> But the directory layout did not describe in the doc currently, and I found 
> some users confused about this, such as[1][2], so I propose to add a 
> description for the checkpoint directory layout in the documentation, maybe 
> in the page {{checkpoints#DirectoryStructure}}[3]
>  [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751]
>  [2] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html]
>  [3] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis

2019-11-22 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980151#comment-16980151
 ] 

Congxian Qiu(klion26) commented on FLINK-14928:
---

FLINK-14866 fixed much of them, but there still a few broken links in page 
systemFunctions.html

> Documentation links check nightly run failed on travis
> --
>
> Key: FLINK-14928
> URL: https://issues.apache.org/jira/browse/FLINK-14928
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This test stage fails stably, with below error:
> {noformat}
> [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found.
> [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/getting-started/tutorials/datastream_api.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found.
> http://localhost:4000/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/getting-started/tutorials/datastream_api.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> ---
> Found 11 broken links.
> {noformat}
> And here is the latest instance: 
> https://api.travis-ci.org/v3/job/615032410/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis

2019-11-22 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979973#comment-16979973
 ] 

Congxian Qiu(klion26) commented on FLINK-14928:
---

the reason here is FLINK-14638 moved some doc to another place, I'll file a pr 
for this.

> Documentation links check nightly run failed on travis
> --
>
> Key: FLINK-14928
> URL: https://issues.apache.org/jira/browse/FLINK-14928
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Congxian Qiu(klion26)
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> This test stage fails stably, with below error:
> {noformat}
> [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found.
> [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/getting-started/tutorials/datastream_api.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found.
> http://localhost:4000/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/getting-started/tutorials/datastream_api.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> ---
> Found 11 broken links.
> {noformat}
> And here is the latest instance: 
> https://api.travis-ci.org/v3/job/615032410/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14928) Documentation links check nightly run failed on travis

2019-11-22 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979967#comment-16979967
 ] 

Congxian Qiu(klion26) commented on FLINK-14928:
---

I'll take a look at this issue.

> Documentation links check nightly run failed on travis
> --
>
> Key: FLINK-14928
> URL: https://issues.apache.org/jira/browse/FLINK-14928
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.10.0
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.10.0
>
>
> This test stage fails stably, with below error:
> {noformat}
> [2019-11-22 03:10:45] ERROR `/dev/table/udfs.html' not found.
> [2019-11-22 03:10:46] ERROR `/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/getting-started/tutorials/datastream_api.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/dev/table/functions/tableApi.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/udfs.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions.html' not found.
> [2019-11-22 03:10:49] ERROR 
> `/zh/dev/table/functions/streaming/query_configuration.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/sql.html' not found.
> [2019-11-22 03:10:49] ERROR `/zh/dev/table/functions/tableApi.html' not found.
> http://localhost:4000/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/getting-started/tutorials/datastream_api.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/udfs.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/zh/dev/table/functions/streaming/query_configuration.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/zh/dev/table/functions/tableApi.html:
> Remote file does not exist -- broken link!!!
> ---
> Found 11 broken links.
> {noformat}
> And here is the latest instance: 
> https://api.travis-ci.org/v3/job/615032410/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14892) Add documentation for checkpoint directory layout

2019-11-21 Thread Congxian Qiu(klion26) (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-14892:
--
Summary: Add documentation for checkpoint directory layout  (was: Add 
description for checkpoint directory layout)

> Add documentation for checkpoint directory layout
> -
>
> Key: FLINK-14892
> URL: https://issues.apache.org/jira/browse/FLINK-14892
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> In FLINK-8531, we change the checkpoint directory layout to
> {code:java}
> /user-defined-checkpoint-dir
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
> But the directory layout did not describe in the doc currently, and I found 
> some users confused about this, such as[1][2], so I propose to add a 
> description for the checkpoint directory layout in the documentation, maybe 
> in the page {{checkpoints#DirectoryStructure}}[3]
>  [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751]
>  [2] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html]
>  [3] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14892) Add description for checkpoint directory layout

2019-11-21 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979239#comment-16979239
 ] 

Congxian Qiu(klion26) commented on FLINK-14892:
---

[~sewen] as you're the owner of FLINK-8531, what do you think about this, and 
please assign this to me if is valid, thanks

> Add description for checkpoint directory layout
> ---
>
> Key: FLINK-14892
> URL: https://issues.apache.org/jira/browse/FLINK-14892
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> In FLINK-8531, we change the checkpoint directory layout to
> {code:java}
> /user-defined-checkpoint-dir
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}
> But the directory layout did not describe in the doc currently, and I found 
> some users confused about this, such as[1][2], so I propose to add a 
> description for the checkpoint directory layout in the documentation, maybe 
> in the page {{checkpoints#DirectoryStructure}}[3]
>  [1] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751]
>  [2] 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html]
>  [3] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14892) Add description for checkpoint directory layout

2019-11-21 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-14892:
-

 Summary: Add description for checkpoint directory layout
 Key: FLINK-14892
 URL: https://issues.apache.org/jira/browse/FLINK-14892
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Checkpointing
Reporter: Congxian Qiu(klion26)
 Fix For: 1.10.0


In FLINK-8531, we change the checkpoint directory layout to
{code:java}
/user-defined-checkpoint-dir
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...
{code}
But the directory layout did not describe in the doc currently, and I found 
some users confused about this, such as[1][2], so I propose to add a 
description for the checkpoint directory layout in the documentation, maybe in 
the page {{checkpoints#DirectoryStructure}}[3]

 [1] 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751]

 [2] 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html]

 [3] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI

2019-11-19 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977991#comment-16977991
 ] 

Congxian Qiu(klion26) commented on FLINK-14264:
---

kindly ping [~aljoscha]

> Expose CheckpointBackend in checkpoint config RestAPI
> -
>
> Key: FLINK-14264
> URL: https://issues.apache.org/jira/browse/FLINK-14264
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we can get checkpoint config from rest api[1], the response 
> contains the information as below
>  * timeout
>  * min_pause
>  * max_concurrent
>  * externalization
> But did not contain the type of CheckpointBackend, but in some scenarios, we 
> want to get the CheckpointBackend type from Rest, this issue wants to add the 
> simple name of the CheckpointBackend in the {{checkpoints/config rest with 
> key }}{{checkpoint_backend, so the response will contain the information such 
> as below}}
>  * {{timeout}}
>  * {{min_pause}}
>  * {{max_concurrent}}
>  * checkpoint_backend 
>  * externalization
>  
>  [1] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page

2019-11-16 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975905#comment-16975905
 ] 

Congxian Qiu(klion26) commented on FLINK-14830:
---

[~jark], I agree that changing all the links in {{.zh.md}} pages has a lot of 
work to do. I think we can apply this to the new added {{.zh.md}} document, 
when adding a new {{.zh.md}} document we need to change the link to the Chinese 
version, for the exist {{.zh.md}} page, we can update the found wrong links for 
better user experience.

> Correct the link for chinese version stream_checkpointing page 
> ---
>
> Key: FLINK-14830
> URL: https://issues.apache.org/jira/browse/FLINK-14830
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, in Chinese version of stream_checkpointing page, there are some 
> links not correct set to the Chinese version, but set to the English version.
> Such as 
> {{[site.baseurl }}/dev/stream/state/index.html 
> |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md]
> _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_.
> [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html)
> [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) 
>  
> This issue wants to fix the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page

2019-11-16 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975901#comment-16975901
 ] 

Congxian Qiu(klion26) commented on FLINK-14830:
---

[~jark] I think this is a little tricky, because the target page has already 
been translated, but the link did not respect it yet.

The page[1] did not been translated yet, when we click the link of {{[Restart 
Strategies|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/restart_strategies.html]
 }}  we'll redirect to English version currently, but the target page has 
already been translated[2]

I think this needs to be improved, and we need to update all the Chinese 
version even if they have not been translated yet to prevent this problem, what 
do you think?

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/internals/stream_checkpointing.html#recovery]

 [2] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/task_failure_recovery.html]

> Correct the link for chinese version stream_checkpointing page 
> ---
>
> Key: FLINK-14830
> URL: https://issues.apache.org/jira/browse/FLINK-14830
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, in Chinese version of stream_checkpointing page, there are some 
> links not correct set to the Chinese version, but set to the English version.
> Such as 
> {{[site.baseurl }}/dev/stream/state/index.html 
> |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md]
> _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_.
> [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html)
> [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) 
>  
> This issue wants to fix the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14830) Correct the link for chinese version stream_checkpointing page

2019-11-16 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975685#comment-16975685
 ] 

Congxian Qiu(klion26) commented on FLINK-14830:
---

[~jark] what do you think about this, please assign to me if this is valid. 
thanks.

> Correct the link for chinese version stream_checkpointing page 
> ---
>
> Key: FLINK-14830
> URL: https://issues.apache.org/jira/browse/FLINK-14830
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, in Chinese version of stream_checkpointing page, there are some 
> links not correct set to the Chinese version, but set to the English version.
> Such as 
> {{[site.baseurl }}/dev/stream/state/index.html 
> |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md]
> _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_.
> [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html)
> [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) 
>  
> This issue wants to fix the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14830) Correct the link for chinese version stream_checkpointing page

2019-11-16 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-14830:
-

 Summary: Correct the link for chinese version stream_checkpointing 
page 
 Key: FLINK-14830
 URL: https://issues.apache.org/jira/browse/FLINK-14830
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.1
Reporter: Congxian Qiu(klion26)
 Fix For: 1.10.0


Currently, in Chinese version of stream_checkpointing page, there are some 
links not correct set to the Chinese version, but set to the English version.

Such as 

{{[site.baseurl }}/dev/stream/state/index.html 
|https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md]

_[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_.

[State Backends](\{{ site.baseurl }}/ops/state/state_backends.html)

[Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) 

 

This issue wants to fix the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14776) Migrate duration and memory size ConfigOptions in RocksDBConfigurableOptions

2019-11-14 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16974892#comment-16974892
 ] 

Congxian Qiu(klion26) commented on FLINK-14776:
---

[~dwysakowicz] Can I help to do this ticket?

> Migrate duration and memory size ConfigOptions in RocksDBConfigurableOptions
> 
>
> Key: FLINK-14776
> URL: https://issues.apache.org/jira/browse/FLINK-14776
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Reporter: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI

2019-11-13 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973360#comment-16973360
 ] 

Congxian Qiu(klion26) commented on FLINK-14264:
---

[~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} 
After your question, I think maybe {{state_backend}} is better because  we 
describle them as {{state_backend}} in the doc[1]

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html]

> Expose CheckpointBackend in checkpoint config RestAPI
> -
>
> Key: FLINK-14264
> URL: https://issues.apache.org/jira/browse/FLINK-14264
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we can get checkpoint config from rest api[1], the response 
> contains the information as below
>  * timeout
>  * min_pause
>  * max_concurrent
>  * externalization
> But did not contain the type of CheckpointBackend, but in some scenarios, we 
> want to get the CheckpointBackend type from Rest, this issue wants to add the 
> simple name of the CheckpointBackend in the {{checkpoints/config rest with 
> key }}{{checkpoint_backend, so the response will contain the information such 
> as below}}
>  * {{timeout}}
>  * {{min_pause}}
>  * {{max_concurrent}}
>  * checkpoint_backend 
>  * externalization
>  
>  [1] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI

2019-11-13 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973360#comment-16973360
 ] 

Congxian Qiu(klion26) edited comment on FLINK-14264 at 11/13/19 1:57 PM:
-

[~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} 
After your question, I think maybe {{state_backend}} is better because  we 
describle them as {{state_backend}} in the doc[1], what do you think, thanks

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html]


was (Author: klion26):
[~aljoscha] thanks for your reply, first I wrote as {{checkpoint_backend.}} 
After your question, I think maybe {{state_backend}} is better because  we 
describle them as {{state_backend}} in the doc[1]

 [1] 
[https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html]

> Expose CheckpointBackend in checkpoint config RestAPI
> -
>
> Key: FLINK-14264
> URL: https://issues.apache.org/jira/browse/FLINK-14264
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we can get checkpoint config from rest api[1], the response 
> contains the information as below
>  * timeout
>  * min_pause
>  * max_concurrent
>  * externalization
> But did not contain the type of CheckpointBackend, but in some scenarios, we 
> want to get the CheckpointBackend type from Rest, this issue wants to add the 
> simple name of the CheckpointBackend in the {{checkpoints/config rest with 
> key }}{{checkpoint_backend, so the response will contain the information such 
> as below}}
>  * {{timeout}}
>  * {{min_pause}}
>  * {{max_concurrent}}
>  * checkpoint_backend 
>  * externalization
>  
>  [1] 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14433) Move generated Jaas conf file from /tmp directory to Job specific directory

2019-11-13 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973211#comment-16973211
 ] 

Congxian Qiu(klion26) commented on FLINK-14433:
---

[~aljoscha] thanks for your reply, will prepare a pr for this.

> Move generated Jaas conf file from /tmp directory to Job specific directory
> ---
>
> Key: FLINK-14433
> URL: https://issues.apache.org/jira/browse/FLINK-14433
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we’ll generate a jaas file under tmp directory[1], files generated 
> the jobs which run on the same machine will all run into the same directory 
> /tmp, this may be problematic because of the reasons:
>  * Run out of inode for the disk which directory /tmp on
>  * The performance of /tmp directory will affect the read/write performance 
> of jaas file.
>  
> So we propose to change place the jaas file under the 
> {{CoreOptions.TMP_DIRS}} directory other than the /tmp directory.
>  
> [1] 
> https://github.com/apache/flink/blob/dbe1bfa31db4a561b6faa9c1235f02dc130825ca/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java#L143



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   4   >