Flink DataStream KeyedStream 与 AggregateFunction

2019-11-08 Thread
请教下:

1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗?

2、假设1成立,这样就会有数据倾斜的问题。该如何解决?

3、假设1成立,如: DataStream
   .keyBy(userID)
   .timeWindow()
   .aggregate(new
AggregateFunction(...)),这里的AggregateFunction
为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。

这三个问题有点疑惑,大神们帮忙看下!
感谢!


Flink DataStream ElasticsearchSink Checkpoint Error

2019-10-22 Thread
Flink 写入 Elasticsearch,Checkpoint 一直处于IN_PROGRESS 状态,从而导致数据不能写入ES。

如图:

[image: image.png]

帮忙看下!
感谢!


Flink DataStream Broadcast variable

2019-10-21 Thread
请教一下, Flink DataStream中有广播变量的概念吗。

比方说,我有一个全局变量ArrayList,在JobManager端更新,然后在TaskManager中共享。可以做到吗

感谢!


Flink DataStream Micro-Batch

2019-10-21 Thread
想在Flink DataStream中实现微批,可以做到吗。该如何实现?

比方说基于处理时间,从Kafka 消费数据,每隔5s产生一个Batch,然后对这个Batch处理。

感谢!


Re: 回复:Flink 消费Kafka Window不触发计算

2019-10-21 Thread
把 Kafka Source 标记为 Idling source,有地方可以设置吗。没找到这个入口。

Utopia  于2019年10月21日周一 下午2:58写道:

> idling sources 只能解决 Kafka
> 没有数据的问题,如果是在业务逻辑里长时间没有收到某个用户新的数据呢?我想到的一个解决方法是接收到数据时在 Trigger 的 onElement
> 方法中注册一个 ProcessingTimer 用来触发 Window 计算,后续有新的数据就删除之前的 ProcessingTimer.
> 2019年10月21日 +0800 01:38 Ever <439674...@qq.com>,写道:
> > 参考下:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
> >
> > idling sources的说明
> >
> > <>
> >
> >
> > -- 原始邮件 --
> > 发件人: "王佩";
> > 发送时间: 2019年10月20日(星期天) 上午10:38
> > 收件人: "user-zh";
> > 主题: Flink 消费Kafka Window不触发计算
> >
> > Flink 消费Kafka, 有的并发没数据,导致Watermark不更新,进而Window不触发计算。
> >
> > 请教下:
> >
> > 像这种情况,一般解决方式有那些?
> >
> > 感谢!
>


Flink 1.9 Failed to take leadership with session id 异常

2019-10-12 Thread
Flink 1.9 DataStream程序,运行一段时间后报如下错误:

2019-10-09 21:07:44 INFO org.apache.flink.runtime.jobmaster.JobMaster
dissolveResourceManagerConnection 1010 Close ResourceManager
connection be4e0b96b331165ff9f4bd7ef4868d94: JobManager is no longer
the leader..
2019-10-09 21:07:44 INFO org.apache.flink.runtime.jobmaster.JobMaster
onStop 335 Stopping the JobMaster for job
flinkx_liveme_microservice_rawdata(d2f6aa0115f4116f690451ef64c56fd4).
2019-10-09 21:07:44 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService
stop 154 Stopping ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/d2f6aa0115f4116f690451ef64c56fd4/job_manager_lock'}.
2019-10-09 21:07:44 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore
recoverJobGraph 197 Recovered
SubmittedJobGraph(d2f6aa0115f4116f690451ef64c56fd4).
2019-10-09 21:07:44 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint onFatalError 374
Fatal error occurred in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to
take leadership with session id a2a48489-b1b4-4f70-a721-edeaf543e1da.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.dispatcher.DispatcherException:
Termination of previous JobManager for job
d2f6aa0115f4116f690451ef64c56fd4 failed. Cannot submit job under the
same job id.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJobManager$33(Dispatcher.java:949)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.waitForTerminatingJobManager(Dispatcher.java:946)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.tryAcceptLeadershipAndRunJobs(Dispatcher.java:933)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$28(Dispatcher.java:892)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 23 more
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: Could not properly shut down the
JobManagerRunner
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at 

Flink DataStream 用ElasticsearchSink 写ES ConnectionClosedException异常

2019-09-17 Thread
在Flink 写ES,当ES集群繁忙时,会有如下异常:

2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
2019-09-17 16:01:02 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase
afterBulk 430 Failed Elasticsearch bulk request: Connection closed
org.apache.http.ConnectionClosedException: Connection closed
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput(HttpAsyncRequestExecutor.java:345)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:261)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)

Flink 用ElasticsearchSink 写ES时遇到这种异常后,不会自动恢复连接。

目前已知:

ElasticsearchSink中的异常处理,需要通过ElasticsearchSink的setFailureHandler方法来定义各种失败处理方式(如丢弃数据或将数据重新加入队列),但这里不能实现当ConnectionClosedException时,重新打开ES连接。

希望Flink遇到这种异常后,能自动重新打开ES的连接,该如何实现?

感谢


Re: Split a stream into any number of streams

2019-09-17 Thread
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi  于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng” 写入:
>
>
>
>     on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a
> field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>
>


Split a stream into any number of streams

2019-09-16 Thread
Hi All,

I want to split a stream into any number of streams according to a field,
and then process the split stream one by one.

Can this be achieved? What should I do?

Regards,
Pei


Re: Flink 写ES ConcurrentModificationException 异常

2019-09-11 Thread
不是代码的问题,代码里边没有遍历List时进行了remove。看报错是从org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer类里报出来的。
BufferingNoOpRequestIndexer 类不是线程安全的。

wang jinhai  于2019年9月10日周二 下午4:36写道:

> 这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可
>
> 在 2019/9/10 下午4:18,“王佩” 写入:
>
> 用Flink 写ES ConcurrentModificationException 遇到以下异常:
>
> 2019-09-10 08:13:14 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
> 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
> state RUNNING to FAILING.
> java.util.ConcurrentModificationException
> at
> java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at
> org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thr

Re: Flink 实时监控目录下的新文件会有文件被遗漏

2019-08-24 Thread
监控S3上的文件。

王佩  于2019年8月23日周五 上午9:25写道:

> 在Flink 1.8.0下,通过 env.readFile 实时监控目录下的新文件并处理。5千多个文件,有25个文件被遗漏。
>
> 逻辑如下:
>
> 1、一个Flink程序实时将小文件写入目录A
> 2、另一个Flink程序通过env.readFile、PROCESS_CONTINUOUSLY模式实时监控目录A,然后做其他操作
>
> 发现,第二个Flink程序偶尔会遗漏文件。
>
> 请教下: 为什么会有文件丢失,丢失的原因可能是什么?并行度?
>
>


Re: PathIsNotEmptyDirectoryException 异常

2019-08-24 Thread
Checkpoint 设置如下:
env.setStateBackend((StateBackend) new
RocksDBStateBackend(checkpointDirectory,true));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(30 * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

王佩  于2019年8月24日周六 下午4:49写道:

> Flink 版本 1.8.0
> 采用RocksDBStateBackend
>
> 程序上线后,运行过程中,经常会有如下异常:
> 2019-08-24 00:34:05,192 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 02:58:03,572 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 03:21:00,784 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNot

PathIsNotEmptyDirectoryException 异常

2019-08-24 Thread
Flink 版本 1.8.0
采用RocksDBStateBackend

程序上线后,运行过程中,经常会有如下异常:
2019-08-24 00:34:05,192 WARN
 org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Fail to subsume the old checkpoint.
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`s3a:////a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory
is not empty
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-08-24 02:58:03,572 WARN
 org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Fail to subsume the old checkpoint.
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory
is not empty
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-08-24 03:21:00,784 WARN
 org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Fail to subsume the old checkpoint.
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`s3a:////a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory
is not empty
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
at

Flink 实时监控目录下的新文件会有文件被遗漏

2019-08-22 Thread
在Flink 1.8.0下,通过 env.readFile 实时监控目录下的新文件并处理。5千多个文件,有25个文件被遗漏。

逻辑如下:

1、一个Flink程序实时将小文件写入目录A
2、另一个Flink程序通过env.readFile、PROCESS_CONTINUOUSLY模式实时监控目录A,然后做其他操作

发现,第二个Flink程序偶尔会遗漏文件。

请教下: 为什么会有文件丢失,丢失的原因可能是什么?并行度?


Re: AsyncIO 用Redis做缓存

2019-08-06 Thread
这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。

王佩  于2019年8月6日周二 下午10:09写道:

> 需求: 事实表实时Join Kudu中的维度表,用来补全维度。
>
> 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。
>
> 遇到的问题:
> 1、不用Redis缓存,checkpoint很快,效率很高。
> 2、用Redis缓存,用Jedis,但不用连接池,效率很低。
> 3、用Redis缓存,用Redis连接池,效率更低。
>
> 请教下:
> 1、从Kudu中取数据,不用缓存可以吗。
> 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
> 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
> 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。
>
> 烦请解答下,辛苦,感谢。
>
>
>


AsyncIO 用Redis做缓存

2019-08-06 Thread
需求: 事实表实时Join Kudu中的维度表,用来补全维度。

为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。

遇到的问题:
1、不用Redis缓存,checkpoint很快,效率很高。
2、用Redis缓存,用Jedis,但不用连接池,效率很低。
3、用Redis缓存,用Redis连接池,效率更低。

请教下:
1、从Kudu中取数据,不用缓存可以吗。
2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。

烦请解答下,辛苦,感谢。


StreamingFileSink自定义输出的文件名

2019-07-29 Thread
用StreamingFileSink.forBulkFormat写到HDFS上的文件,默认文件名为
part-subtaskID-bucketID,如: part-3-62529。

现在想自定义输出的文件名,不用part-subtaskID-bucketID这种格式。但在Flink中没有找到入口可以自定义文件名。

请教下各位大大,怎么才能实现。


感谢!辛苦!


Re: Flink 1.8 run参数不一样

2019-07-24 Thread
问题解决了,非常感谢!

解决流程:

1、确实在log/下找到了Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.异常

2、设置 export HADOOP_CONF_DIR=`hadoop classpath`

3、重新运行 bin/flink run --help ,出现了`Options for yarn-cluster mode` 选项

感谢大佬!❤❤❤

Zili Chen  于2019年7月24日周三 上午9:51写道:

> 你好,可以查看下 log/ 目录下的相关日志有没有这样一段
>
> 2019-07-24 09:34:36,507 WARN  org.apache.flink.client.cli.CliFrontend
> - Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:264)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 5 more
>
>
> 如果有的话,现在 Flink 把核心包和 hadoop 的 pre-bundled 包分开 release,需要你单独下载 pre-bundled 的
> hadoop 然后放到 lib/ 文件夹里。
>
>
> 具体地,请仔细阅读下载页面[1] "Apache Flink 1.8.1" 字样上面的文字内容。
>
> Best,
> tison.
>
> [1] https://flink.apache.org/downloads.html
>
>
> 王佩  于2019年7月24日周三 上午9:30写道:
>
> > 之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下:
> > Options for yarn-cluster mode:
> >  -d,--detachedIf present, runs the job in
> > detached
> >   mode
> >  -m,--jobmanager Address of the JobManager
> > (master) to
> >   which to connect. Use this flag
> > to
> >   connect to a different
> JobManager
> > than
> >   the one specified in the
> >   configuration.
> >  -sae,--shutdownOnAttachedExitIf the job is submitted in
> > attached
> >   mode, perform a best-effort
> > cluster
> >   shutdown when the CLI is
> > terminated
> >   abruptly, e.g., in response to
> a
> > user
> >   interrupt, such as typing Ctrl
> +
> > C.
> >  -yD  use value for given property
> >  -yd,--yarndetached   If present, runs the job in
> > detached
> >   mode (deprecated; use non-YARN
> >   specific option instead)
> >  -yh,--yarnhelp   Help for the Yarn session CLI.
> >  -yid,--yarnapplicationIdAttach to running YARN session
> >  -yj,--yarnjar   Path to Flink jar file
> >  -yjm,--yarnjobManagerMemory Memory for JobManager Container
> > with
> >   optional unit (default: MB)
> >  -yn,--yarncontainer Number of YARN container to
> > allocate
> >   (=Number of Task Managers)
> >  -ynl,--yarnnodeLabelSpecify YARN node label for the
> > YARN
> >   application
> >  -ynm,--yarnname Set a custom name for the
> > application
> >   on YARN
> >  -yq,--yarnquery  Display available YARN
> resources
> >   (memory, cores)
> >  -yqu,--yarnqueueSpecify YARN queue.
> >  -ys,--yarnslots Number of slots per TaskManager
> >  -yst,--yarnstreaming Start Flink in streaming mode
> >  -yt,--yarnship  Ship files in the specified
> > directory
> >   (t for transfer)
> >  -ytm,--yarntaskManagerMemoryMemory per TaskManager
> Container
> > with
> >   optional unit (default: MB)
> >  -yz,--yarnzookeeperNamespaceNamespace to create the
> Zookeepe

Flink 1.8 run参数不一样

2019-07-23 Thread
之前下载的Flink 1.8,运行bin/flink run --help,会有 yarn-cluster 的一些参数,如下:
Options for yarn-cluster mode:
 -d,--detachedIf present, runs the job in
detached
  mode
 -m,--jobmanager Address of the JobManager
(master) to
  which to connect. Use this flag to
  connect to a different JobManager
than
  the one specified in the
  configuration.
 -sae,--shutdownOnAttachedExitIf the job is submitted in
attached
  mode, perform a best-effort
cluster
  shutdown when the CLI is
terminated
  abruptly, e.g., in response to a
user
  interrupt, such as typing Ctrl +
C.
 -yD  use value for given property
 -yd,--yarndetached   If present, runs the job in
detached
  mode (deprecated; use non-YARN
  specific option instead)
 -yh,--yarnhelp   Help for the Yarn session CLI.
 -yid,--yarnapplicationIdAttach to running YARN session
 -yj,--yarnjar   Path to Flink jar file
 -yjm,--yarnjobManagerMemory Memory for JobManager Container
with
  optional unit (default: MB)
 -yn,--yarncontainer Number of YARN container to
allocate
  (=Number of Task Managers)
 -ynl,--yarnnodeLabelSpecify YARN node label for the
YARN
  application
 -ynm,--yarnname Set a custom name for the
application
  on YARN
 -yq,--yarnquery  Display available YARN resources
  (memory, cores)
 -yqu,--yarnqueueSpecify YARN queue.
 -ys,--yarnslots Number of slots per TaskManager
 -yst,--yarnstreaming Start Flink in streaming mode
 -yt,--yarnship  Ship files in the specified
directory
  (t for transfer)
 -ytm,--yarntaskManagerMemoryMemory per TaskManager Container
with
  optional unit (default: MB)
 -yz,--yarnzookeeperNamespaceNamespace to create the Zookeeper
  sub-paths for high availability
mode
 -z,--zookeeperNamespace Namespace to create the Zookeeper
  sub-paths for high availability
mode


现在下载的Flink 1.8,运行bin/flink run --help,总共只有如下参数,少了yarn-cluster选项:
Action "run" compiles and runs a program.

  Syntax: run [OPTIONS]  
  "run" action options:
 -c,--classClass with the program entry point
  ("main" method or "getPlan()"
method.
  Only needed if the JAR file does
not
  specify the class in its manifest.
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by
means
  of a NFS share). You can use this
  option multiple times for
specifying
  more than one URL. The protocol
must
  be supported by the {@link
  java.net.URLClassLoader}.
 -d,--detachedIf present, runs the job in
detached
  mode
 -n,--allowNonRestoredState   Allow to skip savepoint state that
  cannot be restored. You need to
allow
  this if you removed an operator
from
  your program that was part of the
  program when the savepoint was
  triggered.
 -p,--parallelismThe parallelism with which to run
the
  program. Optional flag to
override the
  default value specified in the
  configuration.
 -q,--sysoutLogging   If present, 

获取每个Operator输入输出的数据条数

2019-07-16 Thread
有如下Pipeline:
  
Kafka Source=>Map=>Filter=>StreamingFileSink.forBulkFormat

希望实现:

   每次写入文件时,拿到这个Pipeline当前批次每个Operator输入、输出的数据条数并写到Mysql。
  
  请教一下,可以实现吗。感谢。