flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least once,结合hudi的upsert语义,最终实现exactly once.
flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least once,结合hudi的upsert语义,最终实现exactly once. 问题: 在使用flink cdc写入hudi时,由于hudi的flush to storage需要在chk触发,因此在大表全量同步期间,没有chk的所有的数据积攒在内存中导致Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded. 期望: flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least once,结合hudi的upsert语义,最终实现exactly once. -- Sent from: http://apache-flink.147419.n8.nabble.com/
对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化
对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化? 求大佬们给点建议 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink cdc 在做全量同步时Lock wait timeout
2021-03-22 09:33:19.554 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (e0f59fb577ea3d275451163cf5cc479b) switched from state RUNNING to FAILING. org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:392) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_232] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_232] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0] Caused by: org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE: 40001. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) ~[1616376681494-flink-connector-mysql-cdc.jar:?] at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) ~[1616376681494-flink-connector-mysql-cdc.jar:?] at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) ~[1616376681494-flink-connector-mysql-cdc.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_232] at
使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
在使用standalone模式,并启动sql cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar, 并且重启过集群。同样方式使用mysql cdc是可以的。 Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath. Available factory identifiers are: blackhole jdbc kafka print --- 所以我是那里没配置对? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬 -- Sent from: http://apache-flink.147419.n8.nabble.com/
使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。
AsyncDataStream //顺序异步IO .orderedWait(input, new AsyncDatabaseRequest(), 5000, TimeUnit.MILLISECONDS, 1000) 当我没重写timeout方法的时候,会执行这个报错信息 resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out.")) 当我重写了timeout方法,如下,程序就卡住了,求大佬解答。 override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit = { println("time out ... ") } -- Sent from: http://apache-flink.147419.n8.nabble.com/