flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least once,结合hudi的upsert语义,最终实现exactly once.

2021-04-17 Thread
flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least
once,结合hudi的upsert语义,最终实现exactly once.

问题:
在使用flink cdc写入hudi时,由于hudi的flush to
storage需要在chk触发,因此在大表全量同步期间,没有chk的所有的数据积攒在内存中导致Caused by:
java.lang.OutOfMemoryError: GC overhead limit exceeded.

期望:
flink cdc是否可以以配置的方式来实现在全量读取时,不阻塞chk来实现at least
once,结合hudi的upsert语义,最终实现exactly once.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化

2021-03-31 Thread
对于多张亿级事实表的历史全量数据 regular join,通过mysql cdc怎样避免OOM,以及优化?

求大佬们给点建议



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink cdc 在做全量同步时Lock wait timeout

2021-03-21 Thread
2021-03-22 09:33:19.554 [flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming
Job (e0f59fb577ea3d275451163cf5cc479b) switched from state RUNNING to
FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
backoffTimeMS=1)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:392)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.GeneratedMethodAccessor64.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_232]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_232]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: org.apache.kafka.connect.errors.ConnectException: Lock wait
timeout exceeded; try restarting transaction Error code: 1205; SQLSTATE:
40001.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[1616376681494-flink-connector-mysql-cdc.jar:?]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[1616376681494-flink-connector-mysql-cdc.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[1616376681494-flink-connector-mysql-cdc.jar:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_232]
at

使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 Thread
在使用standalone模式,并启动sql
cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar,
并且重启过集群。同样方式使用mysql cdc是可以的。

Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'postgres-cdc' that implements
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

Available factory identifiers are:

blackhole
jdbc
kafka
print
---

所以我是那里没配置对?







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 Thread
嗯嗯,是的。安装大佬的方法,的确成功了。再次感谢大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/

使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 Thread
 AsyncDataStream
  //顺序异步IO
  .orderedWait(input, new AsyncDatabaseRequest(), 5000,
TimeUnit.MILLISECONDS, 1000)

  当我没重写timeout方法的时候,会执行这个报错信息 
resultFuture.completeExceptionally(new TimeoutException("Async function call
has timed out."))


  当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
  override def timeout(input: String, resultFuture: ResultFuture[Int]): Unit
= {
println("time out ... ")
  }




--
Sent from: http://apache-flink.147419.n8.nabble.com/