Flink DataStream KeyedStream 与 AggregateFunction
请教下: 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗? 2、假设1成立,这样就会有数据倾斜的问题。该如何解决? 3、假设1成立,如: DataStream .keyBy(userID) .timeWindow() .aggregate(new AggregateFunction(...)),这里的AggregateFunction 为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。 这三个问题有点疑惑,大神们帮忙看下! 感谢!
Flink DataStream ElasticsearchSink Checkpoint Error
Flink 写入 Elasticsearch,Checkpoint 一直处于IN_PROGRESS 状态,从而导致数据不能写入ES。 如图: [image: image.png] 帮忙看下! 感谢!
Flink DataStream Broadcast variable
请教一下, Flink DataStream中有广播变量的概念吗。 比方说,我有一个全局变量ArrayList,在JobManager端更新,然后在TaskManager中共享。可以做到吗 感谢!
Flink DataStream Micro-Batch
想在Flink DataStream中实现微批,可以做到吗。该如何实现? 比方说基于处理时间,从Kafka 消费数据,每隔5s产生一个Batch,然后对这个Batch处理。 感谢!
Re: 回复:Flink 消费Kafka Window不触发计算
把 Kafka Source 标记为 Idling source,有地方可以设置吗。没找到这个入口。 Utopia 于2019年10月21日周一 下午2:58写道: > idling sources 只能解决 Kafka > 没有数据的问题,如果是在业务逻辑里长时间没有收到某个用户新的数据呢?我想到的一个解决方法是接收到数据时在 Trigger 的 onElement > 方法中注册一个 ProcessingTimer 用来触发 Window 计算,后续有新的数据就删除之前的 ProcessingTimer. > 2019年10月21日 +0800 01:38 Ever <439674...@qq.com>,写道: > > 参考下: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html > > > > idling sources的说明 > > > > <> > > > > > > -- 原始邮件 -- > > 发件人: "王佩"; > > 发送时间: 2019年10月20日(星期天) 上午10:38 > > 收件人: "user-zh"; > > 主题: Flink 消费Kafka Window不触发计算 > > > > Flink 消费Kafka, 有的并发没数据,导致Watermark不更新,进而Window不触发计算。 > > > > 请教下: > > > > 像这种情况,一般解决方式有那些? > > > > 感谢! >
Flink 1.9 Failed to take leadership with session id 异常
Flink 1.9 DataStream程序,运行一段时间后报如下错误: 2019-10-09 21:07:44 INFO org.apache.flink.runtime.jobmaster.JobMaster dissolveResourceManagerConnection 1010 Close ResourceManager connection be4e0b96b331165ff9f4bd7ef4868d94: JobManager is no longer the leader.. 2019-10-09 21:07:44 INFO org.apache.flink.runtime.jobmaster.JobMaster onStop 335 Stopping the JobMaster for job flinkx_liveme_microservice_rawdata(d2f6aa0115f4116f690451ef64c56fd4). 2019-10-09 21:07:44 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService stop 154 Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/d2f6aa0115f4116f690451ef64c56fd4/job_manager_lock'}. 2019-10-09 21:07:44 INFO org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore recoverJobGraph 197 Recovered SubmittedJobGraph(d2f6aa0115f4116f690451ef64c56fd4). 2019-10-09 21:07:44 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint onFatalError 374 Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take leadership with session id a2a48489-b1b4-4f70-a721-edeaf543e1da. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.dispatcher.DispatcherException: Termination of previous JobManager for job d2f6aa0115f4116f690451ef64c56fd4 failed. Cannot submit job under the same job id. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJobManager$33(Dispatcher.java:949) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196) at org.apache.flink.runtime.dispatcher.Dispatcher.waitForTerminatingJobManager(Dispatcher.java:946) at org.apache.flink.runtime.dispatcher.Dispatcher.tryAcceptLeadershipAndRunJobs(Dispatcher.java:933) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$28(Dispatcher.java:892) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 23 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not properly shut down the JobManagerRunner at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at
Flink DataStream 用ElasticsearchSink 写ES ConnectionClosedException异常
在Flink 写ES,当ES集群繁忙时,会有如下异常: 2019-09-17 16:01:02 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase afterBulk 430 Failed Elasticsearch bulk request: Connection closed org.apache.http.ConnectionClosedException: Connection closed at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:745) 2019-09-17 16:01:02 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase afterBulk 430 Failed Elasticsearch bulk request: Connection closed org.apache.http.ConnectionClosedException: Connection closed at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345) at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588) at java.lang.Thread.run(Thread.java:745) Flink 用ElasticsearchSink 写ES时遇到这种异常后,不会自动恢复连接。 目前已知: ElasticsearchSink中的异常处理,需要通过ElasticsearchSink的setFailureHandler方法来定义各种失败处理方式(如丢弃数据或将数据重新加入队列),但这里不能实现当ConnectionClosedException时,重新打开ES连接。 希望Flink遇到这种异常后,能自动重新打开ES的连接,该如何实现? 感谢
Re: Split a stream into any number of streams
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。 Flink 从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream Sink 到Parquet。 1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。 2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。 感谢! cai yi 于2019年9月17日周二 下午1:33写道: > 可以使用Side Output, > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理! > > 在 2019/9/17 上午10:05,“Wesley Peng” 写入: > > > > on 2019/9/17 9:55, 王佩 wrote: > > I want to split a stream into any number of streams according to a > field, > > and then process the split stream one by one. > > I think that should be easy done. refer to: > > https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream > > regards. > >
Split a stream into any number of streams
Hi All, I want to split a stream into any number of streams according to a field, and then process the split stream one by one. Can this be achieved? What should I do? Regards, Pei
Re: Flink 写ES ConcurrentModificationException 异常
不是代码的问题,代码里边没有遍历List时进行了remove。看报错是从org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer类里报出来的。 BufferingNoOpRequestIndexer 类不是线程安全的。 wang jinhai 于2019年9月10日周二 下午4:36写道: > 这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可 > > 在 2019/9/10 下午4:18,“王佩” 写入: > > 用Flink 写ES ConcurrentModificationException 遇到以下异常: > > 2019-09-10 08:13:14 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState > 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from > state RUNNING to FAILING. > java.util.ConcurrentModificationException > at > java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) > at java.util.ArrayList$Itr.next(ArrayList.java:851) > at > org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thr
Re: Flink 实时监控目录下的新文件会有文件被遗漏
监控S3上的文件。 王佩 于2019年8月23日周五 上午9:25写道: > 在Flink 1.8.0下,通过 env.readFile 实时监控目录下的新文件并处理。5千多个文件,有25个文件被遗漏。 > > 逻辑如下: > > 1、一个Flink程序实时将小文件写入目录A > 2、另一个Flink程序通过env.readFile、PROCESS_CONTINUOUSLY模式实时监控目录A,然后做其他操作 > > 发现,第二个Flink程序偶尔会遗漏文件。 > > 请教下: 为什么会有文件丢失,丢失的原因可能是什么?并行度? > >
Re: PathIsNotEmptyDirectoryException 异常
Checkpoint 设置如下: env.setStateBackend((StateBackend) new RocksDBStateBackend(checkpointDirectory,true)); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointInterval(30 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 王佩 于2019年8月24日周六 下午4:49写道: > Flink 版本 1.8.0 > 采用RocksDBStateBackend > > 程序上线后,运行过程中,经常会有如下异常: > 2019-08-24 00:34:05,192 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 02:58:03,572 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: > `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory > is not empty > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) > at > org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) > at > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-08-24 03:21:00,784 WARN > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Fail to subsume the old checkpoint. > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNot
PathIsNotEmptyDirectoryException 异常
Flink 版本 1.8.0 采用RocksDBStateBackend 程序上线后,运行过程中,经常会有如下异常: 2019-08-24 00:34:05,192 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2019-08-24 02:58:03,572 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2019-08-24 03:21:00,784 WARN org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Fail to subsume the old checkpoint. org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory is not empty at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at
Flink 实时监控目录下的新文件会有文件被遗漏
在Flink 1.8.0下,通过 env.readFile 实时监控目录下的新文件并处理。5千多个文件,有25个文件被遗漏。 逻辑如下: 1、一个Flink程序实时将小文件写入目录A 2、另一个Flink程序通过env.readFile、PROCESS_CONTINUOUSLY模式实时监控目录A,然后做其他操作 发现,第二个Flink程序偶尔会遗漏文件。 请教下: 为什么会有文件丢失,丢失的原因可能是什么?并行度?
Re: AsyncIO 用Redis做缓存
这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。 王佩 于2019年8月6日周二 下午10:09写道: > 需求: 事实表实时Join Kudu中的维度表,用来补全维度。 > > 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。 > > 遇到的问题: > 1、不用Redis缓存,checkpoint很快,效率很高。 > 2、用Redis缓存,用Jedis,但不用连接池,效率很低。 > 3、用Redis缓存,用Redis连接池,效率更低。 > > 请教下: > 1、从Kudu中取数据,不用缓存可以吗。 > 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。 > 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。 > 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。 > > 烦请解答下,辛苦,感谢。 > > >
AsyncIO 用Redis做缓存
需求: 事实表实时Join Kudu中的维度表,用来补全维度。 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。 遇到的问题: 1、不用Redis缓存,checkpoint很快,效率很高。 2、用Redis缓存,用Jedis,但不用连接池,效率很低。 3、用Redis缓存,用Redis连接池,效率更低。 请教下: 1、从Kudu中取数据,不用缓存可以吗。 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。 烦请解答下,辛苦,感谢。
StreamingFileSink自定义输出的文件名
用StreamingFileSink.forBulkFormat写到HDFS上的文件,默认文件名为 part-subtaskID-bucketID,如: part-3-62529。 现在想自定义输出的文件名,不用part-subtaskID-bucketID这种格式。但在Flink中没有找到入口可以自定义文件名。 请教下各位大大,怎么才能实现。 感谢!辛苦!
Re: Flink 1.8 run参数不一样
问题解决了,非常感谢! 解决流程: 1、确实在log/下找到了Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.异常 2、设置 export HADOOP_CONF_DIR=`hadoop classpath` 3、重新运行 bin/flink run --help ,出现了`Options for yarn-cluster mode` 选项 感谢大佬!❤❤❤ Zili Chen 于2019年7月24日周三 上午9:51写道: > 你好,可以查看下 log/ 目录下的相关日志有没有这样一段 > > 2019-07-24 09:34:36,507 WARN org.apache.flink.client.cli.CliFrontend > - Could not load CLI class > org.apache.flink.yarn.cli.FlinkYarnSessionCli. > > java.lang.NoClassDefFoundError: > org/apache/hadoop/yarn/exceptions/YarnException > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:264) > > at > > org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187) > > at > > org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072) > > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.yarn.exceptions.YarnException > > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > ... 5 more > > > 如果有的话,现在 Flink 把核心包和 hadoop 的 pre-bundled 包分开 release,需要你单独下载 pre-bundled 的 > hadoop 然后放到 lib/ 文件夹里。 > > > 具体地,请仔细阅读下载页面[1] "Apache Flink 1.8.1" 字样上面的文字内容。 > > Best, > tison. > > [1] https://flink.apache.org/downloads.html > > > 王佩 于2019年7月24日周三 上午9:30写道: > > > 之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下: > > Options for yarn-cluster mode: > > -d,--detachedIf present, runs the job in > > detached > > mode > > -m,--jobmanager Address of the JobManager > > (master) to > > which to connect. Use this flag > > to > > connect to a different > JobManager > > than > > the one specified in the > > configuration. > > -sae,--shutdownOnAttachedExitIf the job is submitted in > > attached > > mode, perform a best-effort > > cluster > > shutdown when the CLI is > > terminated > > abruptly, e.g., in response to > a > > user > > interrupt, such as typing Ctrl > + > > C. > > -yD use value for given property > > -yd,--yarndetached If present, runs the job in > > detached > > mode (deprecated; use non-YARN > > specific option instead) > > -yh,--yarnhelp Help for the Yarn session CLI. > > -yid,--yarnapplicationIdAttach to running YARN session > > -yj,--yarnjar Path to Flink jar file > > -yjm,--yarnjobManagerMemory Memory for JobManager Container > > with > > optional unit (default: MB) > > -yn,--yarncontainer Number of YARN container to > > allocate > > (=Number of Task Managers) > > -ynl,--yarnnodeLabelSpecify YARN node label for the > > YARN > > application > > -ynm,--yarnname Set a custom name for the > > application > > on YARN > > -yq,--yarnquery Display available YARN > resources > > (memory, cores) > > -yqu,--yarnqueueSpecify YARN queue. > > -ys,--yarnslots Number of slots per TaskManager > > -yst,--yarnstreaming Start Flink in streaming mode > > -yt,--yarnship Ship files in the specified > > directory > > (t for transfer) > > -ytm,--yarntaskManagerMemoryMemory per TaskManager > Container > > with > > optional unit (default: MB) > > -yz,--yarnzookeeperNamespaceNamespace to create the > Zookeepe
Flink 1.8 run参数不一样
之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下: Options for yarn-cluster mode: -d,--detachedIf present, runs the job in detached mode -m,--jobmanager Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -sae,--shutdownOnAttachedExitIf the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. -yD use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationIdAttach to running YARN session -yj,--yarnjar Path to Flink jar file -yjm,--yarnjobManagerMemory Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabelSpecify YARN node label for the YARN application -ynm,--yarnname Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueueSpecify YARN queue. -ys,--yarnslots Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemoryMemory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespaceNamespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode 现在下载的Flink 1.8,运行bin/flink run --help,总共只有如下参数,少了yarn-cluster选项: Action "run" compiles and runs a program. Syntax: run [OPTIONS] "run" action options: -c,--classClass with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detachedIf present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelismThe parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present,
获取每个Operator输入输出的数据条数
有如下Pipeline: Kafka Source=>Map=>Filter=>StreamingFileSink.forBulkFormat 希望实现: 每次写入文件时,拿到这个Pipeline当前批次每个Operator输入、输出的数据条数并写到Mysql。 请教一下,可以实现吗。感谢。