Hi,这个日志全是&nbsp。。。。有点头大。。。

我刚想到,除了task重启外,还有一种情况是task没有调度成功。
你能通过flink web ui观察到task的状态吗,都是RUNNING吗?
如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。
如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。

Bruce <bruceleeof1...@qq.com> 于2020年8月10日周一 下午6:12写道:

> 下面是附件的内容,请问是因为什么导致重启呢?
>
>
> 2阶段提交demo:
>
>
> @Slf4j public class CommonOracleSink extends
> TwoPhaseCommitSinkFunction<LinkedList<Object&gt;,
> CommonOracleSink.ConnectionState, Void&gt; {  &nbsp; &nbsp; private
> transient String sinkSQL;  &nbsp; &nbsp; public CommonOracleSink() {
> &nbsp; &nbsp; &nbsp; &nbsp; super(new
> KryoSerializer<&gt;(ConnectionState.class, new ExecutionConfig()),
> VoidSerializer.INSTANCE);  &nbsp; &nbsp; }  &nbsp; &nbsp; @Override &nbsp;
> &nbsp; public void open(Configuration parameters) throws Exception { &nbsp;
> &nbsp; &nbsp; &nbsp; super.open(parameters); &nbsp; &nbsp; &nbsp; &nbsp;
> ParameterTool params = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); &nbsp;
> &nbsp; &nbsp; &nbsp; sinkSQL = params.getRequired("sinkSQL"); &nbsp; &nbsp;
> }  &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void
> invoke(ConnectionState connectionState, LinkedList<Object&gt; colList,
> Context context){ &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; System.err.println("start invoke......."); &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Connection connection =
> connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log.info("colList----------------------&gt;",
> JSON.toJSONString(colList)); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> TKQueryRunner runner = new TKQueryRunner(); &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; Object[] params = colList.toArray(); &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; System.err.println("params
> size-----&gt;"+params.length); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> runner.update(connection,sinkSQL,params); &nbsp; &nbsp; &nbsp; &nbsp;
> }catch (Exception e){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> log.error(e.getMessage(),e); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.err.println(e.getMessage());  &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp;
> &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 获取连接,开启手动提交事物 &nbsp;
> &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @return &nbsp; &nbsp; &nbsp;* @throws
> Exception &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp;
> protected ConnectionState beginTransaction() throws Exception {  &nbsp;
> &nbsp; &nbsp; &nbsp; Connection connection = HikariOUtils.getConnection();
> &nbsp; &nbsp; &nbsp; &nbsp; log.info("start beginTransaction......." +
> connection);  &nbsp; &nbsp; &nbsp; &nbsp; return new
> ConnectionState(connection); &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp;
> &nbsp; &nbsp;* 预提交,这里预提交的逻辑在invoke方法中 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp;
> &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;* @throws Exception
> &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected void
> preCommit(ConnectionState connectionState) throws Exception { &nbsp; &nbsp;
> &nbsp; &nbsp; log.info("start preCommit......." + connectionState);
> &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;*
> 如果invoke方法执行正常,则提交事务 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param
> connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp;
> &nbsp; protected void commit(ConnectionState connectionState) { &nbsp;
> &nbsp; &nbsp; &nbsp; log.info("start commit......." + connectionState);
> &nbsp; &nbsp; &nbsp; &nbsp; Connection connection =
> connectionState.connection;  &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.commit(); &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp; &nbsp;
> } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw
> new RuntimeException("提交事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp;
> }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;*
> 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp;
> &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp;
> @Override &nbsp; &nbsp; protected void abort(ConnectionState
> connectionState) { &nbsp; &nbsp; &nbsp; &nbsp; log.error("start abort
> rollback......." + connectionState); &nbsp; &nbsp; &nbsp; &nbsp; Connection
> connection = connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; try {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.rollback(); &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp;
> &nbsp; } catch (SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> throw new RuntimeException("回滚事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp;
> &nbsp; }  &nbsp; &nbsp; static class ConnectionState {  &nbsp; &nbsp;
> &nbsp; &nbsp; private final transient Connection connection;  &nbsp; &nbsp;
> &nbsp; &nbsp; ConnectionState(Connection connection) {  &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; this.connection = connection; &nbsp; &nbsp;
> &nbsp; &nbsp; }  &nbsp; &nbsp; }   }
>
> jobmanager日志
>
> 2020-08-10 16:37:31,892 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] -
> --------------------------------------------------------------------------------2020-08-10
> 16:37:31,897 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Starting YarnJobClusterEntrypoint (Version: 1.11.1,
> Scala: 2.11, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)2020-08-10
> 16:37:31,898 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;OS current user: root2020-08-10 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Current Hadoop/Kerberos user: root2020-08-10
> 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle
> Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Maximum heap size: 3166 MiBytes2020-08-10
> 16:37:32,295 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Hadoop version: 2.7.72020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;JVM Options:2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xmx34628173762020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -Xms34628173762020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp; -XX:MaxMetaspaceSize=2684354562020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog.file=/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
> 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog4j.configuration=file:log4j.properties2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp; &nbsp;
> -Dlog4j.configurationFile=file:log4j.properties2020-08-10 16:37:32,297 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - &nbsp;Program Arguments: (none)2020-08-10 16:37:32,297
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - &nbsp;Classpath:
> :UnifyCompFlink-1.0.jar:lib/flink-csv-1.11.1.jar:lib/flink-json-1.11.1.jar:lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.11.1.jar:lib/flink-table_2.11-1.11.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.11.1.jar:job.graph:flink-conf.yaml::/home/xxx/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/logredactor-1.0.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/junit-4.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpclient-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/avro-1.7.6-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/mockito-all-1.8.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-net-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsch-0.1.42.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/gson-2.2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/paranamer-2.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hamcrest-core-1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-digester-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jets3t-0.9.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpcore-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-client-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-httpclient-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-framework-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/aopalliance-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/javax.inject-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-client-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jline-2.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/asm-3.2.jar2020-08-10
> 16:37:32,299 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] -
> --------------------------------------------------------------------------------2020-08-10
> 16:37:32,301 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Registered UNIX signal handlers for [TERM, HUP,
> INT]2020-08-10 16:37:32,306 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - YARN daemon is running as: root Yarn client user
> obtainer: root2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.memory.process.size, 4 gb2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.jobgraph-path, job.graph2020-08-10 16:37:32,311 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.execution.failover-strategy, region2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> high-availability.cluster-id, application_1591335931326_00242020-08-10
> 16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.address, localhost2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target,
> yarn-per-job2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.memory.process.size, 4 gb2020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.port, 61232020-08-10 16:37:32,312 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:32,313
> INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.attached, true2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.cluster.execution-mode, NORMAL2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.shutdown-on-attached-exit, false2020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars,
> file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10
> 16:37:32,313 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> parallelism.default, 82020-08-10 16:37:32,313 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 12020-08-10 16:37:32,313 WARN
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in
> configuration file
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
> "pipeline.classpaths: "2020-08-10 16:37:32,314 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
> 16:37:32,314 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.yarn.log-config-file,
> /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:32,347
> WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated
> configuration key 'web.port' instead of proper key
> 'rest.bind-port'2020-08-10 16:37:32,362 INFO
> &nbsp;org.apache.flink.runtime.clusterframework.BootstrapTools &nbsp;
> &nbsp; [] - Setting directories for temporary files to:
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_00242020-08-10
> 16:37:32,368 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Starting YarnJobClusterEntrypoint.2020-08-10 16:37:32,413
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Install default filesystem.2020-08-10 16:37:32,461
> INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Install security context.2020-08-10 16:37:32,520
> INFO &nbsp;org.apache.flink.runtime.security.modules.HadoopModule &nbsp;
> &nbsp; &nbsp; [] - Hadoop user set to root (auth:SIMPLE)2020-08-10
> 16:37:32,529 INFO
> &nbsp;org.apache.flink.runtime.security.modules.JaasModule &nbsp; &nbsp;
> &nbsp; &nbsp; [] - Jaas file will be created as
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/jaas-1114046375892877617.conf.2020-08-10
> 16:37:32,539 INFO
> &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Initializing cluster services.2020-08-10 16:37:32,556
> INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Trying to start actor system, external address
> node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,191 INFO
> &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,218 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,378 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses
> :[akka.tcp://flink@node3:40657]2020-08-10 16:37:33,506 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Actor system started at 
> akka.tcp://flink@node3:406572020-08-10
> 16:37:33,539 WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses
> deprecated configuration key 'web.port' instead of proper key
> 'rest.port'2020-08-10 16:37:33,551 INFO
> &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Created BLOB server storage
> directory
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/blobStore-15a573e2-a671-4eb9-975b-b5229cec6bde2020-08-10
> 16:37:33,555 INFO &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Started
> BLOB server at 0.0.0.0:34380 - max concurrent requests: 50 - max backlog:
> 10002020-08-10 16:37:33,570 INFO
> &nbsp;org.apache.flink.runtime.metrics.MetricRegistryImpl &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;[] - No metrics reporter configured, no metrics will be
> exposed/reported.2020-08-10 16:37:33,574 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Trying to start actor system, external address node3:0,
> bind address 0.0.0.0:0.2020-08-10 16:37:33,591 INFO
> &nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; [] - Slf4jLogger started2020-08-10 16:37:33,597 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting remoting2020-08-10 16:37:33,606 INFO
> &nbsp;akka.remote.Remoting &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Remoting started; listening on addresses
> :[akka.tcp://flink-metrics@node3:43096]2020-08-10 16:37:33,642 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp;
> &nbsp; &nbsp;[] - Actor system started at 
> akka.tcp://flink-metrics@node3:430962020-08-10
> 16:37:33,659 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.metrics.dump.MetricQueryService at
> akka://flink-metrics/user/rpc/MetricQueryService .2020-08-10 16:37:33,721
> WARN &nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated
> configuration key 'web.port' instead of proper key
> 'rest.bind-port'2020-08-10 16:37:33,723 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Upload directory
> /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload does
> not exist.&nbsp;2020-08-10 16:37:33,724 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Created directory
> /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload for
> file uploads.2020-08-10 16:37:33,748 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Starting rest endpoint.2020-08-10 16:37:34,110 INFO
> &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster component log
> file:
> /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
> 16:37:34,111 INFO &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster
> component stdout file:
> /home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.out2020-08-10
> 16:37:34,309 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Rest endpoint listening at node3:394692020-08-10 16:37:34,311 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> http://node3:39469 was granted leadership with
> leaderSessionID=00000000-0000-0000-0000-0000000000002020-08-10 16:37:34,312
> INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint []
> - Web frontend listening at http://node3:39469.2020-08-10 16:37:34,403
> INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.yarn.YarnResourceManager at
> akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.jobgraph-path, job.graph2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> high-availability.cluster-id, application_1591335931326_00242020-08-10
> 16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: execution.target,
> yarn-per-job2020-08-10 16:37:34,417 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> jobmanager.rpc.port, 61232020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418
> INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.attached, true2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property: pipeline.jars,
> file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10
> 16:37:34,418 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> parallelism.default, 82020-08-10 16:37:34,418 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Error while trying to split key and value in
> configuration file
> /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
> "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO
> &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
> 16:37:34,419 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property:
> $internal.yarn.log-config-file,
> /home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450
> INFO &nbsp;org.apache.flink.runtime.externalresource.ExternalResourceUtils
> [] - Enabled external resources: []2020-08-10 16:37:34,519 INFO
> &nbsp;org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess
> [] - Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.MiniDispatcher at
> akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO
> &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for
> org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Initializing job empJOB
> (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using restart back off time
> strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000) for empJOB
> (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,667 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Running initialization on master
> for job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,806
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Successfully ran
> initialization on master in 139 ms.2020-08-10 16:37:34,876 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Recovered 0 containers from
> previous attempts ([]).2020-08-10 16:37:34,877 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Register application master
> response contains scheduler resource types: [MEMORY, CPU].2020-08-10
> 16:37:34,877 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] -
> Container matching strategy: MATCH_VCORE.2020-08-10 16:37:34,887 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - ResourceManager
> akka.tcp://flink@node3:40657/user/rpc/resourcemanager_0 was granted
> leadership with fencing token 000000000000000000000000000000002020-08-10
> 16:37:34,891 INFO
> &nbsp;org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> [] - Starting the SlotManager.2020-08-10 16:37:35,466 INFO
> &nbsp;org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology
> [] - Built 1 pipelined regions in 2 ms2020-08-10 16:37:35,483 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - No state backend has been
> configured, using default (Memory / JobManager) MemoryStateBackend (data in
> heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints:
> 'null', asynchronous: TRUE, maxStateSize: 5242880)2020-08-10 16:37:35,503
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3915bc20
> for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:35,509 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl &nbsp; &nbsp;
> &nbsp;[] - JobManager runner for job empJOB
> (eb447d27efb8134da40c0c1dd19fffdf) was granted leadership with session id
> 00000000-0000-0000-0000-000000000000 at 
> akka.tcp://flink@node3:40657/user/rpc/jobmanager_2.2020-08-10
> 16:37:35,514 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting execution of
> job empJOB (eb447d27efb8134da40c0c1dd19fffdf) under job master id
> 00000000000000000000000000000000.2020-08-10 16:37:35,517 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting scheduling with scheduling
> strategy
> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]2020-08-10
> 16:37:35,518 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Job empJOB (eb447d27efb8134da40c0c1dd19fffdf) switched from
> state CREATED to RUNNING.2020-08-10 16:37:35,535 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (1/6)
> (5a6410258857c02ebd1b5ec03a78be4b) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (2/6)
> (299de0d4a8affe02a999edeb84957c41) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (3/6)
> (1b98df27c9019f64835b55fa3de3f363) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (4/6)
> (a7612608772c018d819741ce4d9320bd) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (5/6)
> (b19828c85fc0e92e62f2a7241b610f5b) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
> comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
> PartitionNums: null -&gt; SinkConversionToRow (6/6)
> (c2178a51eda2db900d3212e4f488d00f) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (1/8)
> (2a8db3a2b4cd65fd7cd3e6bac031a971) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (2/8)
> (7aa8dd779d4ff75e4c985be75a52c427) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (3/8)
> (867c814978ea302537065f51516ed766) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,536 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (4/8)
> (4e186575ab42cc6c1d599ae027bf99b8) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (5/8)
> (b107b8bfb0a08c5e7937400c43a0f9ff) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (6/8)
> (28e1f0fa1b9ebed59e4c67b0598864b9) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (7/8)
> (e27e60ff7dcd5245dfd21b23bbd49985) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,537 INFO
> &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp;
> &nbsp; [] - Process -&gt; Sink: Unnamed (8/8)
> (c0f6b9e623c68fd7e9205a8ad686d4e5) switched from CREATED to
> SCHEDULED.2020-08-10 16:37:35,558 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}]2020-08-10 16:37:35,565
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}]2020-08-10 16:37:35,565
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{2447496efd24d542bce06de1b69ec70d}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{2ab761d21cd4368751f3187f122705fa}]2020-08-10 16:37:35,566
> INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Cannot serve slot request, no ResourceManager connected. Adding
> as pending request
> [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}]2020-08-10 16:37:35,574
> INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Connecting to
> ResourceManager 
> akka.tcp://flink@node3:40657/user/rpc/resourcemanager_*(00000000000000000000000000000000)2020-08-10
> 16:37:35,579 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Resolved
> ResourceManager address, beginning registration2020-08-10 16:37:35,584 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering job manager
> 00000000000000000000000000000...@akka.tcp://flink@node3:40657/user/rpc/jobmanager_2
> for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,589 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registered job manager
> 00000000000000000000000000000...@akka.tcp://flink@node3:40657/user/rpc/jobmanager_2
> for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,593 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - JobManager successfully registered
> at ResourceManager, leader id: 00000000000000000000000000000000.2020-08-10
> 16:37:35,594 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id e490d3208119fe28d97f4f0fe94cab28.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{2447496efd24d542bce06de1b69ec70d}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{2ab761d21cd4368751f3187f122705fa}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO
> &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp;
> &nbsp; [] - Requesting new slot
> [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}] and profile
> ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,612 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 1.2020-08-10 16:37:35,614 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id b74299e8619de93adec5869d1fa79d73.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 2.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id 046a3dcf1af40e0539f15fcddfbddf77.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 3.2020-08-10 16:37:35,615 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id 90870250ae0f3bef44cbdd675dede57b.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 4.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id f8063a6fc86162712215a92533532b65.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 5.2020-08-10 16:37:35,616 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile
> ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
> allocation id cfe671ec5448d440838f02145cb6267f.2020-08-10 16:37:35,617 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor
> container with resource WorkerResourceSpec {cpuCores=1.0,
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb
> (1438814063 bytes)}. Number pending workers of this resource is
> 6.2020-08-10 16:37:38,391 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:40,933 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1
> containers.2020-08-10 16:37:40,940 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 containers with
> resource <memory:4096, vCores:1&gt;, 6 pending container
> requests.2020-08-10 16:37:40,953 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - TaskExecutor
> container_1591335931326_0024_01_000003 will be started on node1 with
> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb
> (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=409.600mb (429496736 bytes)}.2020-08-10 16:37:40,976 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Creating container launch
> context for TaskManagers2020-08-10 16:37:40,978 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Starting
> TaskManagers2020-08-10 16:37:40,995 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Removing container request
> Capability[<memory:4096, vCores:1&gt;]Priority[1].2020-08-10 16:37:40,995
> INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Accepted 1 requested
> containers, returned 0 excess containers, 5 pending container requests of
> resource <memory:4096, vCores:1&gt;.2020-08-10 16:37:46,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:37:47,712 INFO
> &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering TaskManager with
> ResourceID container_1591335931326_0024_01_000003 
> (akka.tcp://flink@node1:40857/user/rpc/taskmanager_0)
> at ResourceManager2020-08-10 16:37:54,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:02,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:10,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:18,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:26,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:34,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:42,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:50,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:38:58,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:06,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:14,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:22,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:30,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:38,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:46,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.2020-08-10 16:39:54,389 INFO
> &nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp;
> &nbsp;[] - Checkpoint triggering task Source: HiveTableSource(empno, ename,
> job, mgr, hiredate, sal, comm, deptno) TablePath: flink000.emp,
> PartitionPruned: false, PartitionNums: null -&gt; SinkConversionToRow (1/6)
> of job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but
> SCHEDULED instead. Aborting checkpoint.
>
>
>
> job代码
>
> public class PlatJobExecution {
> &nbsp; &nbsp; private volatile ParameterTool parameters;
> &nbsp; &nbsp; public PlatJobExecution(ParameterTool parameters) {&nbsp;
> &nbsp; &nbsp; &nbsp; this.parameters = parameters;&nbsp; &nbsp; }
> &nbsp; &nbsp; public void execute() throws Exception {
>
> &nbsp; &nbsp; &nbsp; &nbsp; //目标数据源:&nbsp; &nbsp; &nbsp; &nbsp; //目标数据表:
> &nbsp; &nbsp; &nbsp; &nbsp; //1.读取数据 kafka /oracle
> &nbsp;把流注册成一张表【这个过程可以手动完成】 &nbsp; &nbsp; &nbsp;--hive
> &nbsp; &nbsp; &nbsp; &nbsp; //2.执行sql,返回结果
> &nbsp; &nbsp; &nbsp; &nbsp; //3.把结果写入目标数据表 / 写入redis / 写入kafka
> &nbsp; &nbsp; &nbsp; &nbsp; InputStream is =
> ReadKafkaPrint.class.getClassLoader().getResourceAsStream("config.properties");
> &nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameters2 =
> ParameterTool.fromPropertiesFile(is);
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; String targetDatabase =
> parameters.get("sourceDatabase");&nbsp; &nbsp; &nbsp; &nbsp; String
> executiveSql = parameters.get("executiveSql");&nbsp; &nbsp; &nbsp; &nbsp;
> String sinkSQL = parameters.get("sinkSQL");&nbsp; &nbsp; &nbsp; &nbsp;
> String jobName = parameters.get("jobName");
> &nbsp; &nbsp; &nbsp; &nbsp; Map<String, String&gt; pMap =
> Maps.newHashMap();&nbsp; &nbsp; &nbsp; &nbsp;
> pMap.putAll(parameters2.toMap());&nbsp; &nbsp; &nbsp; &nbsp;
> pMap.put("sinkSQL",sinkSQL);
> &nbsp; &nbsp; &nbsp; &nbsp; parameters2 = ParameterTool.fromMap(pMap);
>
> &nbsp; &nbsp; &nbsp; &nbsp; //1.创建执行环境&nbsp; &nbsp; &nbsp; &nbsp;
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp; &nbsp; &nbsp; &nbsp; //全局参数设置&nbsp; &nbsp; &nbsp; &nbsp;
> streamEnv.getConfig().setGlobalJobParameters(parameters2);
> &nbsp; &nbsp; &nbsp; &nbsp; streamEnv.enableCheckpointing(8000,
> CheckpointingMode.EXACTLY_ONCE);//每隔5s进行一次checkpoint
> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings tableEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> &nbsp; &nbsp; &nbsp; &nbsp; //2.流式的TableEnv&nbsp; &nbsp; &nbsp; &nbsp;
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(8));
> &nbsp; &nbsp; &nbsp; &nbsp; //3.注册HiveCatalog&nbsp; &nbsp; &nbsp; &nbsp;
> String name &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;=
> targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String defaultDatabase =
> targetDatabase;&nbsp; &nbsp; &nbsp; &nbsp; String hiveConfDir &nbsp; &nbsp;
> = "/home/xxx/app/flink-1.11.1/jobcfg";&nbsp; &nbsp; &nbsp; &nbsp; String
> version &nbsp; &nbsp; &nbsp; &nbsp; = "1.1.0";
> &nbsp; &nbsp; &nbsp; &nbsp; HiveCatalog catalog = new HiveCatalog(name,
> defaultDatabase, hiveConfDir, version);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.registerCatalog(name, catalog);&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.useCatalog(name);
> &nbsp; &nbsp; &nbsp; &nbsp; //4.流式读取Hive&nbsp; &nbsp; &nbsp; &nbsp;
> tableEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
> true);
> &nbsp; &nbsp; &nbsp; &nbsp; //query&nbsp; &nbsp; &nbsp; &nbsp; Table table
> = tableEnv.sqlQuery(executiveSql);
> &nbsp; &nbsp; &nbsp; &nbsp; // CREATE/INSERT&nbsp; &nbsp; &nbsp; &nbsp; //
> tableEnv.executeSql()
> // &nbsp; &nbsp; &nbsp; &nbsp;tableEnv.toRetractStream(table,
> Row.class).print().setParallelism(1);
> &nbsp; &nbsp; &nbsp; &nbsp;
> SingleOutputStreamOperator<LinkedList<Object&gt;&gt; colList =
> tableEnv.toAppendStream(table, Row.class).process(new ProcessFunction<Row,
> LinkedList<Object&gt;&gt;() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void
> processElement(Row row, Context context,
> Collector<LinkedList<Object&gt;&gt; collector) throws Exception {&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LinkedList<Object&gt;
> linkedList = Lists.newLinkedList();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; for (int i = 0; i < row.getArity(); i++) {&nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> linkedList.add(row.getField(i));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> collector.collect(linkedList);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> }&nbsp; &nbsp; &nbsp; &nbsp; });
> &nbsp; &nbsp; &nbsp; &nbsp; colList.addSink(new CommonOracleSink());
>
> &nbsp; &nbsp; &nbsp; &nbsp; //sink to Oracle&nbsp; &nbsp; &nbsp; &nbsp;
> streamEnv.execute(jobName);
>
> &nbsp; &nbsp; }}
>
>
> 发自我的iPhone
>
>
> ------------------ 原始邮件 ------------------
> 发件人: shizk233 <wangwangdaxian...@gmail.com&gt;
> 发送时间: 2020年8月10日 18:04
> 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org&gt;
> 主题: 回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
>
>
>
> hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。
>
> 不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
> 建议往任务重启的方向排查一下。
>
> Bruce&nbsp;<bruceleeof1...@qq.com&gt;&nbsp;于2020年8月10日周一&nbsp;下午5:01写道:
>
> &gt;&nbsp;您好,这里有个问题反馈下!
> &gt;
> &gt;&nbsp;读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> &gt;&nbsp;没有抛任何异常但是checkpoint失败:
>
> &gt;&nbsp;job&nbsp;eb447d27efb8134da40c0c1dd19fffdf&nbsp;is&nbsp;not&nbsp;in&nbsp;state&nbsp;RUNNING&nbsp;but&nbsp;SCHEDULED
> &gt;&nbsp;instead.&nbsp;Aborting&nbsp;checkpoint.
> &gt;&nbsp;附件
> &gt;&nbsp;1.flink.log是yarn&nbsp;jobmanager打印的伪日志
> &gt;&nbsp;2.Job.txt是job的伪代码
> &gt;&nbsp;3.jdbc两阶段提交的伪代码附件
> &gt;&nbsp;------------------------------
> &gt;&nbsp;发自我的iPhone
> &gt;

Reply via email to