Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
Hi,这个日志全是 有点头大。。。 我刚想到,除了task重启外,还有一种情况是task没有调度成功。 你能通过flink web ui观察到task的状态吗,都是RUNNING吗? 如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。 如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。 Bruce 于2020年8月10日周一 下午6:12写道: > 下面是附件的内容,请问是因为什么导致重启呢? > > > 2阶段提交demo: > > > @Slf4j public class CommonOracleSink extends > TwoPhaseCommitSinkFunction CommonOracleSink.ConnectionState, Void> { private > transient String sinkSQL; public CommonOracleSink() { > super(new > KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), > VoidSerializer.INSTANCE); } @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > ParameterTool params = (ParameterTool) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > sinkSQL = params.getRequired("sinkSQL"); > } @Override protected void > invoke(ConnectionState connectionState, LinkedList Context context){ try { > System.err.println("start invoke..."); > Connection connection = > connectionState.connection; > log.info("colList-->", > JSON.toJSONString(colList)); > TKQueryRunner runner = new TKQueryRunner(); > Object[] params = colList.toArray(); > System.err.println("params > size->"+params.length); > runner.update(connection,sinkSQL,params); > }catch (Exception e){ > log.error(e.getMessage(),e); > System.err.println(e.getMessage()); } > } /** * 获取连接,开启手动提交事物 > * * @return * @throws > Exception */ @Override > protected ConnectionState beginTransaction() throws Exception { > Connection connection = HikariOUtils.getConnection(); > log.info("start beginTransaction..." + > connection); return new > ConnectionState(connection); } /** > * 预提交,这里预提交的逻辑在invoke方法中 * > * @param connectionState * @throws Exception > */ @Override protected void > preCommit(ConnectionState connectionState) throws Exception { > log.info("start preCommit..." + connectionState); > } /** * > 如果invoke方法执行正常,则提交事务 * * @param > connectionState */ @Override > protected void commit(ConnectionState connectionState) { > log.info("start commit..." + connectionState); > Connection connection = > connectionState.connection; try { > connection.commit(); > connection.close(); > } catch (SQLException e) { throw > new RuntimeException("提交事物异常"); } > } /** * > 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 * > * @param connectionState */ > @Override protected void abort(ConnectionState > connectionState) { log.error("start abort > rollback..." + connectionState); Connection > connection = connectionState.connection; try { > connection.rollback(); > connection.close(); > } catch (SQLException e) { > throw new RuntimeException("回滚事物异常"); } > } static class ConnectionState { > private final transient Connection connection; > ConnectionState(Connection connection) { > this.connection = connection; > } } } > > jobmanager日志 > > 2020-08-10 16:37:31,892 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - > 2020-08-10 > 16:37:31,897 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - Starting YarnJobClusterEntrypoint (Version: 1.11.1, > Scala: 2.11, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)2020-08-10 > 16:37:31,898 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - OS current user: root2020-08-10 16:37:32,295 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - Current Hadoop/Kerberos user: root2020-08-10 > 16:37:32,295 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle > Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - Maximum heap size: 3166 MiBytes2020-08-10 > 16:37:32,295 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10 > 16:37:32,297 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - Hadoop version: 2.7.72020-08-10 16:37:32,297 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - JVM Options:2020-08-10 16:37:32,297 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - -Xmx34628173762020-08-10 16:37:32,297 INFO > org.apache.
回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
下面是附件的内容,请问是因为什么导致重启呢? 2阶段提交demo: @Slf4j public class CommonOracleSink extends TwoPhaseCommitSinkFunctionhttp://node3:39469 was granted leadership with leaderSessionID=----2020-08-10 16:37:34,312 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Web frontend listening at http://node3:39469.2020-08-10 16:37:34,403 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.yarn.YarnResourceManager at akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: internal.jobgraph-path, job.graph2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.cluster-id, application_1591335931326_00242020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.target, yarn-per-job2020-08-10 16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 61232020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.attached, true2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: pipeline.jars, file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 82020-08-10 16:37:34,418 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN org.apache.flink.configuration.GlobalConfiguration [] - Error while trying to split key and value in configuration file /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_01/flink-conf.yaml:16: "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 16:37:34,419 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: $internal.yarn.log-config-file, /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450 INFO org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled external resources: []2020-08-10 16:37:34,519 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.MiniDispatcher at akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34
Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。 不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。 建议往任务重启的方向排查一下。 Bruce 于2020年8月10日周一 下午5:01写道: > 您好,这里有个问题反馈下! > > 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库, > 没有抛任何异常但是checkpoint失败: > job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > 附件 > 1.flink.log是yarn jobmanager打印的伪日志 > 2.Job.txt是job的伪代码 > 3.jdbc两阶段提交的伪代码附件 > -- > 发自我的iPhone >
读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
您好,这里有个问题反馈下! 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库, 没有抛任何异常但是checkpoint失败: job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. Aborting checkpoint. 附件 1.flink.log是yarn jobmanager打印的伪日志 2.Job.txt是job的伪代码 3.jdbc两阶段提交的伪代码附件 发自我的iPhone