[jira] [Created] (FLINK-19325) Optimize the consumed time for checkpoint completion
Congxian Qiu(klion26) created FLINK-19325: - Summary: Optimize the consumed time for checkpoint completion Key: FLINK-19325 URL: https://issues.apache.org/jira/browse/FLINK-19325 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Currently when completing a checkpoint, we'll write out the state handle out in {{MetadataV2V3SerializerBase.java#serializeStreamStateHandle}} {code:java} static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException { if (stateHandle == null) { dos.writeByte(NULL_HANDLE); } else if (stateHandle instanceof RelativeFileStateHandle) { dos.writeByte(RELATIVE_STREAM_STATE_HANDLE); RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle) stateHandle; dos.writeUTF(relativeFileStateHandle.getRelativePath()); dos.writeLong(relativeFileStateHandle.getStateSize()); } else if (stateHandle instanceof FileStateHandle) { dos.writeByte(FILE_STREAM_STATE_HANDLE); FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; dos.writeLong(stateHandle.getStateSize()); dos.writeUTF(fileStateHandle.getFilePath().toString()); } else if (stateHandle instanceof ByteStreamStateHandle) { dos.writeByte(BYTE_STREAM_STATE_HANDLE); ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle; dos.writeUTF(byteStreamStateHandle.getHandleName()); byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); } else { throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass()); } dos.flush(); } {code} We'll call {{dos.flush()}} after every state handle written out. But this may consume too much time and is not needed, because we'll close the outputstream after all things have been written out. I propose to remove the {{dos.flush()}} here to optimize the consumed time for checkpoint completion -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19249) Job would wait sometime(~10 min) before failover if some connection broken
Congxian Qiu(klion26) created FLINK-19249: - Summary: Job would wait sometime(~10 min) before failover if some connection broken Key: FLINK-19249 URL: https://issues.apache.org/jira/browse/FLINK-19249 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Congxian Qiu(klion26) {quote}encountered this error on 1.7, after going through the master code, I think the problem is still there {quote} When the network environment is not so good, the connection between the server and the client may be disconnected innocently. After the disconnection, the server will receive the IOException such as below {code:java} java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:51) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:367) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) at java.lang.Thread.run(Thread.java:748) {code} then release the view reader. But the job would not fail until the downstream detect the disconnection because of {{channelInactive}} later(~10 min). between such time, the job can still process data, but the broken channel can't transfer any data or event, so snapshot would fail during this time. this will cause the job to replay many data after failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18748) Savepoint would be queued unexpected
Congxian Qiu(klion26) created FLINK-18748: - Summary: Savepoint would be queued unexpected Key: FLINK-18748 URL: https://issues.apache.org/jira/browse/FLINK-18748 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.1, 1.11.0 Reporter: Congxian Qiu(klion26) After FLINK-17342, when triggering a checkpoint/savepoint, we'll check whether the request can be triggered in {{CheckpointRequestDecider#chooseRequestToExecute}}, the logic is as follow: {code:java} Preconditions.checkState(Thread.holdsLock(lock)); // 1. if (isTriggering || queuedRequests.isEmpty()) { return Optional.empty(); } // 2 too many ongoing checkpoitn/savepoint if (pendingCheckpointsSizeSupplier.get() >= maxConcurrentCheckpointAttempts) { return Optional.of(queuedRequests.first()) .filter(CheckpointTriggerRequest::isForce) .map(unused -> queuedRequests.pollFirst()); } // 3 check the timestamp of last complete checkpoint long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); if (nextTriggerDelayMillis > 0) { return onTooEarly(nextTriggerDelayMillis); } return Optional.of(queuedRequests.pollFirst()); {code} But if currently {{pendingCheckpointsSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}, and the request is a savepoint, the savepoint will still wait some time in step 3. I think we should trigger the savepoint immediately if {{pendingCheckpointSizeSupplier.get()}} < {{maxConcurrentCheckpointAttempts}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17926) Can't build flink-web docker image because of EOL of Ubuntu:18.10
Congxian Qiu(klion26) created FLINK-17926: - Summary: Can't build flink-web docker image because of EOL of Ubuntu:18.10 Key: FLINK-17926 URL: https://issues.apache.org/jira/browse/FLINK-17926 Project: Flink Issue Type: Bug Components: Project Website Reporter: Congxian Qiu(klion26) Currently, the Dockerfile[1] in flink-web project is broken because of the EOL of Ubuntu 18.10[2], will encounter the error such as bellow when executing {{./run.sh}} {code:java} Err:3 http://security.ubuntu.com/ubuntu cosmic-security Release 404 Not Found [IP: 91.189.88.152 80] Ign:4 http://archive.ubuntu.com/ubuntu cosmic-updates InRelease Ign:5 http://archive.ubuntu.com/ubuntu cosmic-backports InRelease Err:6 http://archive.ubuntu.com/ubuntu cosmic Release 404 Not Found [IP: 91.189.88.142 80] Err:7 http://archive.ubuntu.com/ubuntu cosmic-updates Release 404 Not Found [IP: 91.189.88.142 80] Err:8 http://archive.ubuntu.com/ubuntu cosmic-backports Release 404 Not Found [IP: 91.189.88.142 80] Reading package lists... {code} The current LTS versions can be found in release website[2]. Apache Flink docker image uses fedora:28[3], so it unaffected. As fedora does not have LTS release[4], I proposal to use Ubuntu for website here, and change the version from {{18.10}} to the closest LTS version {{18.04, tried locally, it works successfully.}} [1] [https://github.com/apache/flink-web/blob/bc66f0f0f463ab62a22e81df7d7efd301b76a6b4/docker/Dockerfile#L17] [2] [https://wiki.ubuntu.com/Releases] [3]https://github.com/apache/flink/blob/e92b2bf19bdf03ad3bae906dc5fa3781aeddb3ee/docs/docker/Dockerfile#L17 [4] https://fedoraproject.org/wiki/Fedora_Release_Life_Cycle#Maintenance_Schedule -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17571) A better way to show the files used in currently checkpoints
Congxian Qiu(klion26) created FLINK-17571: - Summary: A better way to show the files used in currently checkpoints Key: FLINK-17571 URL: https://issues.apache.org/jira/browse/FLINK-17571 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Inspired by the [user mail]([http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]). Currently, there are three types of directory for a checkpoint, the files in TASKOWND and EXCLUSIVE directory can be deleted safely, but users can't delete the files in the SHARED directory safely(the files may be created a long time ago). I think it's better to give users a better way to know which files are currently used(so the others are not used) maybe a command-line command such as below is ok enough to support such a feature. {{./bin/flink checkpoint list $checkpointDir # list all the files used in checkpoint}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17458) TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers
Congxian Qiu(klion26) created FLINK-17458: - Summary: TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers Key: FLINK-17458 URL: https://issues.apache.org/jira/browse/FLINK-17458 Project: Flink Issue Type: Test Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) When verifying the RC of release-1.10.1, found that `TaskExecutorSubmissionTest#testFailingScheduleOrUpdateConsumers` will fail because of Timeout sometime. I run this test locally in IDEA, found the following exception(locally in only encounter 2 in 1000 times) {code:java} java.lang.InterruptedExceptionjava.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at akka.event.LoggingBus$class.akka$event$LoggingBus$$addLogger(Logging.scala:182) at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:117) at akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:116) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:116) at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:113) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682) at akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:113) at akka.event.EventStream.startDefaultLoggers(EventStream.scala:22) at akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:662) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:870) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:891) at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96) at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70) at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55) at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125) at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:113) at org.apache.flink.runtime.akka.AkkaUtils$.createLocalActorSystem(AkkaUtils.scala:68) at org.apache.flink.runtime.akka.AkkaUtils.createLocalActorSystem(AkkaUtils.scala) at org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:74) at org.apache.flink.runtime.rpc.TestingRpcService.(TestingRpcService.java:67) at org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment$Builder.build(TaskSubmissionTestEnvironment.java:349) at org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest.testFailingScheduleOrUpdateConsumers(TaskExecutorSubmissionTest.java:544) at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) org.junit.runners.model.TestTimedOutException: test timed out after 1 milliseconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at
[jira] [Created] (FLINK-17043) Putting more information into accounting when failing a job in FailureManager
Congxian Qiu(klion26) created FLINK-17043: - Summary: Putting more information into accounting when failing a job in FailureManager Key: FLINK-17043 URL: https://issues.apache.org/jira/browse/FLINK-17043 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Currently, we only fail the job when we received continues "CHECKPOINT_DECLINED" message, but ignored the "timeout"/"task_failure"/"task_checkpoint_failure"/"finalize_checkpoint_failure" and so on. In my opinion, we should put some checkpoint failure reason above into account when failing a job (not only the "CHECKPOINT_DECLINED" reason" This issue is inspired by a [user mail list|[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Making-job-fail-on-Checkpoint-Expired-tt34051.html]], -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17007) Add section "How to handle application parameters" in DataStream documentation
Congxian Qiu(klion26) created FLINK-17007: - Summary: Add section "How to handle application parameters" in DataStream documentation Key: FLINK-17007 URL: https://issues.apache.org/jira/browse/FLINK-17007 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Congxian Qiu(klion26) Fix For: 1.11.0 This issue wants to add a section “How to handle application parameters” in the DataStream page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16916) The logic of NullableSerializer#copy is wrong
Congxian Qiu(klion26) created FLINK-16916: - Summary: The logic of NullableSerializer#copy is wrong Key: FLINK-16916 URL: https://issues.apache.org/jira/browse/FLINK-16916 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.10.0, 1.9.2, 1.8.3 Reporter: Congxian Qiu(klion26) Fix For: 1.11.0 When debugging the problem reported by FLINK-16724, Found that the logic of {{NullableSerializer#copy}} is wrong. currently, the logic is such as below: {code:java} public void copy(DataInputView source, DataOutputView target) throws IOException { boolean isNull = source.readBoolean(); target.writeBoolean(isNull); if (isNull) { target.write(padding); } else { originalSerializer.copy(source, target); } } {code} we forgot to skip {{paddings.length}} bytes when if the {{padding}}'s length is not 0. We can correct the logic such as below {code:java} public void copy(DataInputView source, DataOutputView target) throws IOException { boolean isNull = deserializeNull(source); // this will skip the padding values. target.writeBoolean(isNull); if (isNull) { target.write(padding); } else { originalSerializer.copy(source, target); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15699) FirstValueAggFunctionWithoutOrderTest failed on Traivs
Congxian Qiu(klion26) created FLINK-15699: - Summary: FirstValueAggFunctionWithoutOrderTest failed on Traivs Key: FLINK-15699 URL: https://issues.apache.org/jira/browse/FLINK-15699 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.11.0 Reporter: Congxian Qiu(klion26) 09:48:53.473 [ERROR] COMPILATION ERROR : 09:48:53.473 [INFO] - 09:48:53.473 [ERROR] /home/travis/build/flink-ci/flink/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueAggFunctionWithoutOrderTest.java:[78,37] incompatible types: inference variable T has incompatible bounds equality constraints: java.lang.Byte,T,T,T,T,T,java.lang.Boolean,T lower bounds: java.lang.Boolean 09:48:53.473 [INFO] 1 error [https://travis-ci.com/flink-ci/flink/jobs/277466696] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15685) org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs
Congxian Qiu(klion26) created FLINK-15685: - Summary: org.apache.flink.tests.util.kafka.SQLClientKafkaITCase failed on traivs Key: FLINK-15685 URL: https://issues.apache.org/jira/browse/FLINK-15685 Project: Flink Issue Type: Bug Components: Table SQL / Client, Tests Reporter: Congxian Qiu(klion26) 08:50:02.717 [ERROR] Tests run: 3, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 76.395 s <<< FAILURE! - in org.apache.flink.tests.util.kafka.SQLClientKafkaITCase 1897208:50:02.722 [ERROR] testKafka[0: kafka-version:0.10 kafka-sql-version:.*kafka-0.10.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 23.806 s <<< ERROR! 18973java.io.IOException: 18974Process execution failed due error. Error output:Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. 18975 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) 18976Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. 18977 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) 18978 at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) 18979 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) 18980 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 18981Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader 18982 at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) 18983 at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:285) 18984 at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:163) 18985 at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) 18986 at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:371) 18987 at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:552) 18988 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) 18989 at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:550) 18990 at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:487) 18991 at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:159) 18992 at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:118) 18993 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:748) 18994 ... 3 more 18995Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumReader 18996 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) 18997 at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 18998 at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 18999 ... 15 more 19000 19001 at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.insertIntoAvroTable(SQLClientKafkaITCase.java:178) 19002 at org.apache.flink.tests.util.kafka.SQLClientKafkaITCase.testKafka(SQLClientKafkaITCase.java:151) 19003 1900408:50:02.734 [ERROR] testKafka[1: kafka-version:0.11 kafka-sql-version:.*kafka-0.11.jar](org.apache.flink.tests.util.kafka.SQLClientKafkaITCase) Time elapsed: 25.227 s <<< ERROR! 19005java.io.IOException: 19006Process execution failed due error. Error output:Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. 19007 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) 19008Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. 19009 at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:759) 19010 at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) 19011 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) 19012 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 19013Caused by: java.lang.NoClassDefFoundError: org/apache/avro/io/DatumReader 19014 at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:64) 19015 at
[jira] [Created] (FLINK-15661) JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed because of Could not find Flink job
Congxian Qiu(klion26) created FLINK-15661: - Summary: JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure failed because of Could not find Flink job Key: FLINK-15661 URL: https://issues.apache.org/jira/browse/FLINK-15661 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.11.0 Reporter: Congxian Qiu(klion26) 2020-01-19T06:25:02.3856954Z [ERROR] JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:347 The program encountered a ExecutionException : org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (47fe3e8df0e59994938485f683d1410e) 2020-01-19T06:25:02.3857171Z at org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.propagateException(JobExecutionResultHandler.java:91) 2020-01-19T06:25:02.3857571Z at org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler.lambda$handleRequest$1(JobExecutionResultHandler.java:82) 2020-01-19T06:25:02.3857866Z at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) 2020-01-19T06:25:02.3857982Z at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) 2020-01-19T06:25:02.3859852Z at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 2020-01-19T06:25:02.3860440Z at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 2020-01-19T06:25:02.3860732Z at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872) 2020-01-19T06:25:02.3860960Z at akka.dispatch.OnComplete.internal(Future.scala:263) 2020-01-19T06:25:02.3861099Z at akka.dispatch.OnComplete.internal(Future.scala:261) 2020-01-19T06:25:02.3861232Z at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 2020-01-19T06:25:02.3861391Z at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 2020-01-19T06:25:02.3861546Z at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 2020-01-19T06:25:02.3861712Z at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) 2020-01-19T06:25:02.3861809Z at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 2020-01-19T06:25:02.3861916Z at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 2020-01-19T06:25:02.3862221Z at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) 2020-01-19T06:25:02.3862475Z at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) 2020-01-19T06:25:02.3862626Z at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) 2020-01-19T06:25:02.3862736Z at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 2020-01-19T06:25:02.3862820Z at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 2020-01-19T06:25:02.3867146Z at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 2020-01-19T06:25:02.3867318Z at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 2020-01-19T06:25:02.3867441Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867552Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867664Z at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 2020-01-19T06:25:02.3867763Z at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 2020-01-19T06:25:02.3867843Z at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 2020-01-19T06:25:02.3867936Z at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 2020-01-19T06:25:02.3868036Z at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) 2020-01-19T06:25:02.3868145Z at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 2020-01-19T06:25:02.3868223Z at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 2020-01-19T06:25:02.3868313Z at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 2020-01-19T06:25:02.3868390Z at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-01-19T06:25:02.3868520Z Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (47fe3e8df0e59994938485f683d1410e) 2020-01-19T06:25:02.3868625Z at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$17(Dispatcher.java:516)
[jira] [Created] (FLINK-15619) GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount failed on Azure
Congxian Qiu(klion26) created FLINK-15619: - Summary: GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount failed on Azure Key: FLINK-15619 URL: https://issues.apache.org/jira/browse/FLINK-15619 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) 01-16T08:32:11.0214825Z [ERROR] testAllProcessingTimeTumblingGroupWindowOverCount[StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase) Time elapsed: 2.213 s <<< ERROR! 2020-01-16T08:32:11.0223298Z org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2020-01-16T08:32:11.0241857Z at org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase.testAllProcessingTimeTumblingGroupWindowOverCount(GroupWindowTableAggregateITCase.scala:130) 2020-01-16T08:32:11.0261909Z Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=0) 2020-01-16T08:32:11.0274347Z Caused by: java.lang.Exception: Artificial Failure 2020-01-16T08:32:11.0291664Z [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/4391/logs/16] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure
Congxian Qiu(klion26) created FLINK-15451: - Summary: TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure Key: FLINK-15451 URL: https://issues.apache.org/jira/browse/FLINK-15451 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.9.1 Reporter: Congxian Qiu(klion26) 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase 2019-12-31T02:43:39.4768373Z [ERROR] testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase) Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15424) Make all AppendingState#add respect the java doc
Congxian Qiu(klion26) created FLINK-15424: - Summary: Make all AppendingState#add respect the java doc Key: FLINK-15424 URL: https://issues.apache.org/jira/browse/FLINK-15424 Project: Flink Issue Type: Bug Affects Versions: 1.9.1, 1.8.3 Reporter: Congxian Qiu(klion26) Currently, We have a java doc in {{[AppendingState#add|https://github.com/apache/flink/blob/52fdee1d0c7af24d25c51caa073e29f11b07210b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java#L63]}} {code:java} If null is passed in, the state value will remain unchanged.{code} but currently, the implementation did not respect this, take {{HeapReducingState}} as an example, we'll clear the state if the passed parameter is null {code:java} @Override public void add(V value) throws IOException { if (value == null) { clear(); return; } try { stateTable.transform(currentNamespace, value, reduceTransformation); } catch (Exception e) { throw new IOException("Exception while applying ReduceFunction in reducing state", e); } } {code} But in {{RocksDBReducingState}} we would not clear the state, and put the null value into state if serializer can serialize null. {code:java} @Override public void add(V value) throws Exception { byte[] key = getKeyBytes(); V oldValue = getInternal(key); V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value); updateInternal(key, newValue); } {code} this issue wants to make all {{Appending}}State respect the javadoc of {{AppendingState}}, and return directly if the passed in parameter is null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15236) Add a safety net for concurrent checkpoints on TM side
Congxian Qiu(klion26) created FLINK-15236: - Summary: Add a safety net for concurrent checkpoints on TM side Key: FLINK-15236 URL: https://issues.apache.org/jira/browse/FLINK-15236 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) As discussed in FLINK-13808, we can add additional config {{taskmanager.checkpoints.max-concurrent}} that limits the number of concurrent checkpoints on the TM for safety net. this configure {{taskmanager.checkpoints.max-concurrent}}, and the default value for maxConcurrentCheckpoints=1 is 1 and unlimited for maxConcurrentCheckpoints > 1. * If maxConcurrentCheckpoints = 1, the default {{taskmanager.checkpoints.max-concurrent}} is 1. * If maxConcurrentCheckpoints > 1 the default value for {{taskmanager.checkpoints.max-concurrent}}, is unlimited should not take manually triggered checkpoints/savepoints into account. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15150) ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed on Travis
Congxian Qiu(klion26) created FLINK-15150: - Summary: ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange failed on Travis Key: FLINK-15150 URL: https://issues.apache.org/jira/browse/FLINK-15150 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) 06:37:18.423 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.014 s <<< FAILURE! - in org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase 375406:37:18.423 [ERROR] testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase) Time elapsed: 14.001 s <<< ERROR! 3755java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: JobMaster has been shut down. 3756 at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.lambda$testJobExecutionOnClusterWithLeaderChange$1(ZooKeeperLeaderElectionITCase.java:131) 3757 at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:131) 3758Caused by: org.apache.flink.util.FlinkException: JobMaster has been shut down. 3759 [https://travis-ci.com/flink-ci/flink/jobs/264972218] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15136) Update the Chinese version of "Working with state"
Congxian Qiu(klion26) created FLINK-15136: - Summary: Update the Chinese version of "Working with state" Key: FLINK-15136 URL: https://issues.apache.org/jira/browse/FLINK-15136 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Congxian Qiu(klion26) Currently, we enabled background cleanup of state with TTL by default in FLINK-14898, and we should update the Chinese version to respect it. documentation location : docs/dev/stream/state/state.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14892) Add description for checkpoint directory layout
Congxian Qiu(klion26) created FLINK-14892: - Summary: Add description for checkpoint directory layout Key: FLINK-14892 URL: https://issues.apache.org/jira/browse/FLINK-14892 Project: Flink Issue Type: Improvement Components: Documentation, Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 In FLINK-8531, we change the checkpoint directory layout to {code:java} /user-defined-checkpoint-dir | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ... {code} But the directory layout did not describe in the doc currently, and I found some users confused about this, such as[1][2], so I propose to add a description for the checkpoint directory layout in the documentation, maybe in the page {{checkpoints#DirectoryStructure}}[3] [1] [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-checkpointing-behavior-td30749.html#a30751] [2] [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Operator-name-and-uuid-best-practices-td31031.html] [3] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#directory-structure] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14830) Correct the link for chinese version stream_checkpointing page
Congxian Qiu(klion26) created FLINK-14830: - Summary: Correct the link for chinese version stream_checkpointing page Key: FLINK-14830 URL: https://issues.apache.org/jira/browse/FLINK-14830 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.1 Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 Currently, in Chinese version of stream_checkpointing page, there are some links not correct set to the Chinese version, but set to the English version. Such as {{[site.baseurl }}/dev/stream/state/index.html |https://github.com/apache/flink/blob/master/docs/internals/stream_checkpointing.zh.md] _[state backend](\{{ site.baseurl }}/ops/state/state_backends.html)_. [State Backends](\{{ site.baseurl }}/ops/state/state_backends.html) [Restart Strategies](\{{ site.baseurl }}/dev/restart_strategies.html) This issue wants to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14433) Move generated Jaas conf file from /tmp directory to Job specific directory
Congxian Qiu(klion26) created FLINK-14433: - Summary: Move generated Jaas conf file from /tmp directory to Job specific directory Key: FLINK-14433 URL: https://issues.apache.org/jira/browse/FLINK-14433 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 Currently, we’ll generate a jaas file under tmp directory[1], files generated the jobs which run on the same machine will all run into the same directory /tmp, this may be problematic because of the reasons: * Run out of inode for the disk which directory /tmp on * The performance of /tmp directory will affect the read/write performance of jaas file. So we propose to change place the jaas file under the {{CoreOptions.TMP_DIRS}} directory other than the /tmp directory. [1] https://github.com/apache/flink/blob/dbe1bfa31db4a561b6faa9c1235f02dc130825ca/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java#L143 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14340) Specify an unique DFSClient name for Hadoop FileSystem
Congxian Qiu(klion26) created FLINK-14340: - Summary: Specify an unique DFSClient name for Hadoop FileSystem Key: FLINK-14340 URL: https://issues.apache.org/jira/browse/FLINK-14340 Project: Flink Issue Type: Improvement Components: FileSystems Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 Currently, when Flink read/write to HDFS, we do not set the DFSClient name for all the connections, so we can’t distinguish the connections, and can’t find the specific Job or TM quickly. This issue wants to add the {{container_id}} as a unique name when init Hadoop File System, so we can easily distinguish the connections belongs to which Job/TM. Core changes is add a line such as below in {{org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create}} {code:java} hadoopConfig.set(“mapreduce.task.attempt.id”, System.getenv().getOrDefault(CONTAINER_KEY_IN_ENV, DEFAULT_CONTAINER_ID));{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14264) Expose CheckpointBackend in checkpoint config RestAPI
Congxian Qiu(klion26) created FLINK-14264: - Summary: Expose CheckpointBackend in checkpoint config RestAPI Key: FLINK-14264 URL: https://issues.apache.org/jira/browse/FLINK-14264 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.9.0 Reporter: Congxian Qiu(klion26) Currently, we can get checkpoint config from rest api[1], the response contains the information as below * timeout * min_pause * max_concurrent * externalization But did not contain the type of CheckpointBackend, but in some scenarios, we want to get the CheckpointBackend type from Rest, this issue wants to add the simple name of the CheckpointBackend in the {{checkpoints/config rest with key }}{{checkpoint_backend, so the response will contain the information such as below}} * {{timeout}} * {{min_pause}} * {{max_concurrent}} * checkpoint_backend * externalization [1] [https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints-config] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14035) Introduce/Change some log for snapshot to better analysis checkpoint problem
Congxian Qiu(klion26) created FLINK-14035: - Summary: Introduce/Change some log for snapshot to better analysis checkpoint problem Key: FLINK-14035 URL: https://issues.apache.org/jira/browse/FLINK-14035 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.10.0 Reporter: Congxian Qiu(klion26) Currently, the information for checkpoint are mostly debug log (especially on TM side). If we want to track where the checkpoint steps and consume time during each step when we have a failed checkpoint or the checkpoint time is too long, we need to restart the job with enabling debug log, this issue wants to improve this situation, wants to change some exist debug log from debug to info, and add some more debug log. we have changed this log level in our production in Alibaba, and it seems no problem until now. Detail {{change the log below from debug level to info}} * log about \{{Starting checkpoint xxx }} in TM side * log about Sync complete in TM side * log about async compete in TM side Add debug log * log about receiving the barrier for exactly once mode - align from at lease once mode If this issue is valid, then I'm happy to contribute it. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13861) No new checkpoint trigged when canceling an expired checkpoint failed
Congxian Qiu(klion26) created FLINK-13861: - Summary: No new checkpoint trigged when canceling an expired checkpoint failed Key: FLINK-13861 URL: https://issues.apache.org/jira/browse/FLINK-13861 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.9.0, 1.8.1, 1.7.2 Reporter: Congxian Qiu(klion26) Fix For: 1.10.0 I encountered this problem in our private fork of Flink, after taking a look at the current master branch of Apache Flink, I think the problem exists here also. Problem Detail: 1. checkpoint canceled because of expiration, so will call the canceller such as below {code:java} final Runnable canceller = () -> { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job); failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); triggerQueuedRequests(); } } };{code} But failPendingCheckpoint may throw exceptions because it will call {{CheckpointCoordinator#failPendingCheckpoint}} -> {{PendingCheckpoint#abort}} -> {{PendingCheckpoint#reportFailedCheckpoint}} -> initialize a FailedCheckpointStates, may throw an exception by {{checkArgument}} Did not find more about why there ever failed the {{checkArgument currently(this problem did not reproduce frequently)}}, will create an issue for that if I have more findings. 2. when trigger checkpoint next, we'll first check if there already are too many checkpoints such as below {code:java} private void checkConcurrentCheckpoints() throws CheckpointException { if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } } {code} the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will always true 3. no checkpoint will be triggered ever from that on. Because of the {{failPendingCheckpoint}} may throw Exception, so we may place the remove pending checkpoint logic in a finally block. I'd like to file a pr for this if this really needs to fix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13164) Add a putAndGet benchmark for StateBenchmark
Congxian Qiu(klion26) created FLINK-13164: - Summary: Add a putAndGet benchmark for StateBenchmark Key: FLINK-13164 URL: https://issues.apache.org/jira/browse/FLINK-13164 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) As discussed in the [thread|https://github.com/dataArtisans/flink-benchmarks/issues/19], we make sure that there only one sst file in all list state benchmark, so there will not cover the compaction scenario, this ticket wants to add a benchmark for it. Note: perIteration result may be not stable for the newly introduced benchmark, but we have to make sure that the final result stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13148) Expose WindowedStream.sideOutputLateData() from CoGroupedStreams
Congxian Qiu(klion26) created FLINK-13148: - Summary: Expose WindowedStream.sideOutputLateData() from CoGroupedStreams Key: FLINK-13148 URL: https://issues.apache.org/jira/browse/FLINK-13148 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) As FLINK-10050 supported {{alloedLateness}}, but we can not get the side output containing the late data, this issue wants to fix it. For implementation, I want to add an input parameter {{OutputTag}} in {{WithWindow}} as following {code:java} protected WithWindow(DataStream input1, DataStream input2, KeySelector keySelector1, KeySelector keySelector2, TypeInformation keyType, WindowAssigner, W> windowAssigner, Trigger, ? super W> trigger, Evictor, ? super W> evictor, Time allowedLateness, OutputTage> outputTag) { ... } {code} and add a function sideOutputLateData(OutputTag outputTag) in with Window {code:java} public WithWindow sideOutputLateData(OutputTag> outputTag) { ... } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
Congxian Qiu(klion26) created FLINK-13009: - Summary: YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots Key: FLINK-13009 URL: https://issues.apache.org/jira/browse/FLINK-13009 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE when Travis build. >From the code of hadoop-2.8.3, seems the {{rmContext}} is null {code:java} // code from https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12994) Improve the buffer processing performance in SpilledBufferOrEventSequence#getNext
Congxian Qiu(klion26) created FLINK-12994: - Summary: Improve the buffer processing performance in SpilledBufferOrEventSequence#getNext Key: FLINK-12994 URL: https://issues.apache.org/jira/browse/FLINK-12994 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) This is a follow-up issue of FLINK-12536, please see the benchmark there for more information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12880) Support triggering checkpoint automatically when cancel a job
Congxian Qiu(klion26) created FLINK-12880: - Summary: Support triggering checkpoint automatically when cancel a job Key: FLINK-12880 URL: https://issues.apache.org/jira/browse/FLINK-12880 Project: Flink Issue Type: New Feature Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Currently, we have FLINK-11458 for -- {{stop-with-savepoint}} and FLINK-12619 for {{stop-with-checkpoint. This issue wants to add a configure }}{{autoCheckpoint(ture or false)}}, when {{autoCheckpoint}} sets to true, we'll do a checkpoint automatically when canceling a job. More discussion can be found in [[here|https://github.com/apache/flink/pull/8617#issuecomment-502721271]] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
Congxian Qiu(klion26) created FLINK-12733: - Summary: Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint Key: FLINK-12733 URL: https://issues.apache.org/jira/browse/FLINK-12733 Project: Flink Issue Type: Sub-task Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint
Congxian Qiu(klion26) created FLINK-12619: - Summary: Support TERMINATE/SUSPEND Job with Checkpoint Key: FLINK-12619 URL: https://issues.apache.org/jira/browse/FLINK-12619 Project: Flink Issue Type: New Feature Components: Runtime / State Backends Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Inspired by the idea of FLINK-11458, we propose to support terminate/suspend a job with checkpoint. This improvement cooperates with incremental and external checkpoint features, that if checkpoint is retained and this feature is configured, we will trigger a checkpoint before the job stops. It could accelarate job recovery a lot since: 1. No source rewinding required any more. 2. It's much faster than taking a savepoint since incremental checkpoint is enabled. Please note that conceptually savepoints is different from checkpoint in a similar way that backups are different from recovery logs in traditional database systems. So we suggest using this feature only for job recovery, while stick with FLINK-11458 for the upgrading/cross-cluster-job-migration/state-backend-switch cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator chained in a single task
Congxian Qiu(klion26) created FLINK-12296: - Summary: Data loss silently in RocksDBStateBackend when more than one operator chained in a single task Key: FLINK-12296 URL: https://issues.apache.org/jira/browse/FLINK-12296 Project: Flink Issue Type: Test Components: Runtime / State Backends Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) As the mail list said[1], there may be a problem when more than one operator chained in a single task, and all the operators have states, this will be data loss silently. [1] https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12281) SortDistinctAggregateITCase>DistinctAggregateITCaseBase.testSomeColumnsBothInDistinctAggAndGroupBy Failed
Congxian Qiu(klion26) created FLINK-12281: - Summary: SortDistinctAggregateITCase>DistinctAggregateITCaseBase.testSomeColumnsBothInDistinctAggAndGroupBy Failed Key: FLINK-12281 URL: https://issues.apache.org/jira/browse/FLINK-12281 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) 01:07:26.060 [INFO] Running org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase 01:08:38.157 [ERROR] Tests run: 23, Failures: 0, Errors: 1, Skipped: 2, Time elapsed: 72.093 s <<< FAILURE! - in org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase 01:08:38.157 [ERROR] testSomeColumnsBothInDistinctAggAndGroupBy(org.apache.flink.table.runtime.batch.sql.agg.SortDistinctAggregateITCase) Time elapsed: 5.972 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: java.lang.RuntimeException: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 64 pages. Only 60 pages are remaining. Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 64 pages. Only 60 pages are remaining. [https://travis-ci.org/apache/flink/jobs/522602981] [https://travis-ci.org/apache/flink/jobs/522603433] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11941) Reuse single FileSegmentCheckpointStateOutputStream for multiple checkpoints
Congxian Qiu(klion26) created FLINK-11941: - Summary: Reuse single FileSegmentCheckpointStateOutputStream for multiple checkpoints Key: FLINK-11941 URL: https://issues.apache.org/jira/browse/FLINK-11941 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) After the previous first sub-tasks have been resolved, we'll solve the small file problems by writing all states of one checkpoint into a single physical file. In this sub-task, we'll try to share the underlying physical file across multiple checkpoints. This is configurable and disabled for default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11940) Reduce storage amplification for FSCSOS
Congxian Qiu(klion26) created FLINK-11940: - Summary: Reduce storage amplification for FSCSOS Key: FLINK-11940 URL: https://issues.apache.org/jira/browse/FLINK-11940 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) In this sub-task, we'll solve the storage amplification problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11939) Adjust components that need be modified
Congxian Qiu(klion26) created FLINK-11939: - Summary: Adjust components that need be modified Key: FLINK-11939 URL: https://issues.apache.org/jira/browse/FLINK-11939 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) In this sub-task, we'll adjust all the components that need to be modified, such as {{SharedStateRegistry, }}{{PlaceholderStreamStateHandle}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11938) Introduce all the new components
Congxian Qiu(klion26) created FLINK-11938: - Summary: Introduce all the new components Key: FLINK-11938 URL: https://issues.apache.org/jira/browse/FLINK-11938 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) In this sub-task, we'll introduce all the new components, such as {{FileSegmentCheckpointStreamFactory}}, {{FileSegmentCheckpointStateOutputStream}}, {{FileSegmentCheckpointLocationStorage}}, {{FileSegmentStateHandle}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11937) Resolve small file problem in RocksDB incremental checkpoint
Congxian Qiu(klion26) created FLINK-11937: - Summary: Resolve small file problem in RocksDB incremental checkpoint Key: FLINK-11937 URL: https://issues.apache.org/jira/browse/FLINK-11937 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Affects Versions: 1.7.2 Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Currently when incremental checkpoint is enabled in RocksDBStateBackend a separate file will be generated on DFS for each sst file. This may cause “file flood” when running intensive workload (many jobs with high parallelism) in big cluster. According to our observation in Alibaba production, such file flood introduces at lease two drawbacks when using HDFS as the checkpoint storage FileSystem: 1) huge number of RPC request issued to NN which may burst its response queue; 2) huge number of files causes big pressure on NN’s on-heap memory. In Flink we ever noticed similar small file flood problem and tried to resolved it by introducing ByteStreamStateHandle(FLINK-2818), but this solution has its limitation that if we configure the threshold too low there will still be too many small files, while if too high the JM will finally OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend with incremental snapshot strategy. We propose a new OutputStream called FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS will reuse the same underlying distributed file until its size exceeds a preset threshold. We plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly resolve the specific storage amplification issue on FSCSOS, and lastly add an option to reuse FSCSOS across multiple checkpoints to further reduce the DFS file number. More details please refer to the attached design doc. [1] [https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11904) Improve MemoryStateBackendTest by using JUnit's Parameterized
Congxian Qiu(klion26) created FLINK-11904: - Summary: Improve MemoryStateBackendTest by using JUnit's Parameterized Key: FLINK-11904 URL: https://issues.apache.org/jira/browse/FLINK-11904 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Currently, there are two classes {{MemoryStateBackendTest}} and {{AsyncMemoryStateBackendTest}}, the only difference is whether using async mode. We can improve this by using JUnit's Parameterize -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11903) Improve FileStateBackendTest by using JUnit's Parameterized
Congxian Qiu(klion26) created FLINK-11903: - Summary: Improve FileStateBackendTest by using JUnit's Parameterized Key: FLINK-11903 URL: https://issues.apache.org/jira/browse/FLINK-11903 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Currently, there is a test base class called {{FileStateBackendTest}}, and a subclass {{AsyncFileStateBackendTest}}. the only difference is whether to use async mode. We can improve the test code by using JUnit's Parameterized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11850) ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis
Congxian Qiu(klion26) created FLINK-11850: - Summary: ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis Key: FLINK-11850 URL: https://issues.apache.org/jira/browse/FLINK-11850 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest 08:20:01.694 [ERROR] testSimpleCloseAndCleanupAllData(org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest) Time elapsed: 0.076 s <<< ERROR! org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /foo/bar/flink/default/leaderlatch/resource_manager_lock/_c_477d0124-92f3-4069-98aa-a71b8243250c-latch-00 at org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.runCleanupTest(ZooKeeperHaServicesTest.java:203) at org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.testSimpleCloseAndCleanupAllData(ZooKeeperHaServicesTest.java:128) Travis links: https://travis-ci.org/apache/flink/jobs/502960186 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11797) Implementation empty unit tests in CheckpointCoordinatorMasterHooksTest
Congxian Qiu(klion26) created FLINK-11797: - Summary: Implementation empty unit tests in CheckpointCoordinatorMasterHooksTest Key: FLINK-11797 URL: https://issues.apache.org/jira/browse/FLINK-11797 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) Current, there are some empty tests(no implementation) such as {{testSerializationFailsOnTrigger, }}{{testHookCallFailsOnTrigger, }}{{testDeserializationFailsOnRestore}}{{testHookCallFailsOnRestore, }}{{testTypeIncompatibleWithSerializerOnStore and }}{{testTypeIncompatibleWithHookOnRestore in class }}{{CheckpointCoordinatorMasterHooksTest[1].}}{{}} If implementation them make sense, I'll give the patch. [1] https://github.com/apache/flink/blob/e3248d844c728c714857c5d69520f08ddf4e4c85/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java#L393 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11704) Improve AbstractCheckpointStateOutputStreamTestBase with Parameterized
Congxian Qiu(klion26) created FLINK-11704: - Summary: Improve AbstractCheckpointStateOutputStreamTestBase with Parameterized Key: FLINK-11704 URL: https://issues.apache.org/jira/browse/FLINK-11704 Project: Flink Issue Type: Improvement Components: Tests Reporter: Congxian Qiu(klion26) Assignee: Congxian Qiu(klion26) In the current version, there locates an {{AbstractCheckpointStateOutputStreamTestBase}} and two implementation class {{FileBasedStateOutputStreamTest}} and {{FsCheckpointMetadataOutputStreamTest}}, the differences are return the different {{FSDataOutputStream}} and the specified {{FileStateHandle. we can improve this by using Junit's Parameterized.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)