Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

2020-08-10 文章 shizk233
Hi,这个日志全是 有点头大。。。

我刚想到,除了task重启外,还有一种情况是task没有调度成功。
你能通过flink web ui观察到task的状态吗,都是RUNNING吗?
如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。
如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。

Bruce  于2020年8月10日周一 下午6:12写道:

> 下面是附件的内容,请问是因为什么导致重启呢?
>
>
> 2阶段提交demo:
>
>
> @Slf4j public class CommonOracleSink extends
> TwoPhaseCommitSinkFunction CommonOracleSink.ConnectionState, Void> {      private
> transient String sinkSQL;      public CommonOracleSink() {
>         super(new
> KryoSerializer<>(ConnectionState.class, new ExecutionConfig()),
> VoidSerializer.INSTANCE);      }      @Override  
>   public void open(Configuration parameters) throws Exception {  
>       super.open(parameters);        
> ParameterTool params = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();  
>       sinkSQL = params.getRequired("sinkSQL");    
> }      @Override     protected void
> invoke(ConnectionState connectionState, LinkedList Context context){         try {      
>       System.err.println("start invoke...");  
>           Connection connection =
> connectionState.connection;            
> log.info("colList-->",
> JSON.toJSONString(colList));            
> TKQueryRunner runner = new TKQueryRunner();        
>     Object[] params = colList.toArray();      
>       System.err.println("params
> size->"+params.length);            
> runner.update(connection,sinkSQL,params);        
> }catch (Exception e){            
> log.error(e.getMessage(),e);            
> System.err.println(e.getMessage());          }   
>   }      /**      * 获取连接,开启手动提交事物  
>    *      * @return      * @throws
> Exception      */     @Override    
> protected ConnectionState beginTransaction() throws Exception {   
>       Connection connection = HikariOUtils.getConnection();
>         log.info("start beginTransaction..." +
> connection);          return new
> ConnectionState(connection);     }      /**  
>    * 预提交,这里预提交的逻辑在invoke方法中      *    
>  * @param connectionState      * @throws Exception
>      */     @Override     protected void
> preCommit(ConnectionState connectionState) throws Exception {    
>     log.info("start preCommit..." + connectionState);
>     }      /**      *
> 如果invoke方法执行正常,则提交事务      *      * @param
> connectionState      */     @Override  
>   protected void commit(ConnectionState connectionState) {  
>       log.info("start commit..." + connectionState);
>         Connection connection =
> connectionState.connection;          try {  
>           connection.commit();    
>         connection.close();        
> } catch (SQLException e) {             throw
> new RuntimeException("提交事物异常");         }    
> }      /**      *
> 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行      *    
>  * @param connectionState      */    
> @Override     protected void abort(ConnectionState
> connectionState) {         log.error("start abort
> rollback..." + connectionState);         Connection
> connection = connectionState.connection;         try {
>             connection.rollback();  
>           connection.close();      
>   } catch (SQLException e) {            
> throw new RuntimeException("回滚事物异常");         }  
>   }      static class ConnectionState {     
>     private final transient Connection connection;     
>     ConnectionState(Connection connection) {     
>         this.connection = connection;    
>     }      }   }
>
> jobmanager日志
>
> 2020-08-10 16:37:31,892 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -
> 2020-08-10
> 16:37:31,897 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  Starting YarnJobClusterEntrypoint (Version: 1.11.1,
> Scala: 2.11, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)2020-08-10
> 16:37:31,898 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  OS current user: root2020-08-10 16:37:32,295 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  Current Hadoop/Kerberos user: root2020-08-10
> 16:37:32,295 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle
> Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  Maximum heap size: 3166 MiBytes2020-08-10
> 16:37:32,295 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10
> 16:37:32,297 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  Hadoop version: 2.7.72020-08-10 16:37:32,297 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -  JVM Options:2020-08-10 16:37:32,297 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint    
>    [] -     -Xmx34628173762020-08-10 16:37:32,297 INFO
>  org.apache.

回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

2020-08-10 文章 Bruce
下面是附件的内容,请问是因为什么导致重启呢?


2阶段提交demo:


@Slf4j public class CommonOracleSink extends 
TwoPhaseCommitSinkFunctionhttp://node3:39469 was granted leadership with 
leaderSessionID=----2020-08-10 16:37:34,312 
INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
Web frontend listening at http://node3:39469.2020-08-10 16:37:34,403 INFO 
 org.apache.flink.runtime.rpc.akka.AkkaRpcService       
      [] - Starting RPC endpoint for 
org.apache.flink.yarn.YarnResourceManager at 
akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: internal.jobgraph-path, 
job.graph2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
high-availability.cluster-id, application_1591335931326_00242020-08-10 
16:37:34,417 INFO  org.apache.flink.configuration.GlobalConfiguration 
          [] - Loading configuration property: 
jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: execution.target, 
yarn-per-job2020-08-10 16:37:34,417 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: jobmanager.rpc.port, 
61232020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: execution.attached, 
true2020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: pipeline.jars, 
file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:34,418 
INFO  org.apache.flink.configuration.GlobalConfiguration     
      [] - Loading configuration property: parallelism.default, 
82020-08-10 16:37:34,418 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Error while trying to split key and value in configuration 
file 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_01/flink-conf.yaml:16:
 "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO 
 org.apache.flink.configuration.GlobalConfiguration       
    [] - Loading configuration property: 
$internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 
16:37:34,419 INFO  org.apache.flink.configuration.GlobalConfiguration 
          [] - Loading configuration property: 
$internal.yarn.log-config-file, 
/home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450 INFO 
 org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - 
Enabled external resources: []2020-08-10 16:37:34,519 INFO 
 org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] 
- Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO 
 org.apache.flink.runtime.rpc.akka.AkkaRpcService       
      [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.MiniDispatcher at 
akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO 
 org.apache.flink.runtime.rpc.akka.AkkaRpcService       
      [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO 
 org.apache.flink.runtime.jobmaster.JobMaster         
        [] - Initializing job empJOB 
(eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO 
 org.apache.flink.runtime.jobmaster.JobMaster         
        [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 
16:37:34

Re: 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

2020-08-10 文章 shizk233
hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。

不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。

Bruce  于2020年8月10日周一 下午5:01写道:

> 您好,这里有个问题反馈下!
>
> 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> 没有抛任何异常但是checkpoint失败:
> job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
> 附件
> 1.flink.log是yarn jobmanager打印的伪日志
> 2.Job.txt是job的伪代码
> 3.jdbc两阶段提交的伪代码附件
> --
> 发自我的iPhone
>


读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败

2020-08-10 文章 Bruce
您好,这里有个问题反馈下!


读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
没有抛任何异常但是checkpoint失败:
job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED 
instead. Aborting checkpoint.
附件
1.flink.log是yarn jobmanager打印的伪日志
2.Job.txt是job的伪代码
3.jdbc两阶段提交的伪代码附件

发自我的iPhone