Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 Tony Wei
Hi yidan,

你可以試試 SQL Hints [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/


yidan zhao  於 2021年11月2日 週二 下午1:03寫道:

> 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
> 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
> flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
> 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'sink.partition-commit.delay'='1 h',
> >   'sink.partition-commit.policy.kind'='metastore,success-file');
>
> Caizhi Weng  于2021年11月2日周二 上午10:47写道:
>
> > Hi!
> >
> > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
> > hive 里表的结构等信息。但 kafka 的表定义仍然要写。
> >
> > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
> >
> > yidan zhao  于2021年11月1日周一 下午3:05写道:
> >
> > > 如题,我看了官方文档,定义好kafka和hive表。
> > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> > >
> > > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> > > parquet TBLPROPERTIES (
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'sink.partition-commit.delay'='1 h',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE,
> > >   log_ts TIMESTAMP(3),
> > >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > > watermark on TIMESTAMP column) WITH (...);
> > >
> > >
> > >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> > >
> > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> > >
> > >
> > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> > >
> >
>


Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 yidan zhao
嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='1 h',
>   'sink.partition-commit.policy.kind'='metastore,success-file');

Caizhi Weng  于2021年11月2日周二 上午10:47写道:

> Hi!
>
> hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
> hive 里表的结构等信息。但 kafka 的表定义仍然要写。
>
> 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
>
> yidan zhao  于2021年11月1日周一 下午3:05写道:
>
> > 如题,我看了官方文档,定义好kafka和hive表。
> > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> >
> > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> >   user_id STRING,
> >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> > parquet TBLPROPERTIES (
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'sink.partition-commit.delay'='1 h',
> >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> >   user_id STRING,
> >   order_amount DOUBLE,
> >   log_ts TIMESTAMP(3),
> >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > watermark on TIMESTAMP column) WITH (...);
> >
> >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> >
> > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> >
> >
> > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> >
>


flink yarn-per-job dubug

2021-11-01 文章 GCAM
hello,

??flink 1.14.0?? maven 3.2.5 ??


1. ??mvn clean install -DskipTests -Dfast


2. ??flink-1.14.0/flink-dist/target/flink-1.14.0-bin/ ??


3.  conf/flink-conf.yaml
  ??env.java.opts: 
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5003"


4. ??idea(windows )?? ??Remote ??
  Trantsport: Socket
  Host: xxx.xxx.xx.xx 
Port: 5003


5. ??yarn-per-job ??
 bin/flink run -t yarn-per-job -c 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount 
examples/streaming/SocketWindowWordCount.jar --hostname xxx.xxx.xx.xx --port 



6. ??idea ??debug


??debug flink yarn-per-job 
jobmanager??YarnJobClusterEntrypoint  
main
??YarnJobClusterEntrypoint debug?

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 Caizhi Weng
Hi!

hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
hive 里表的结构等信息。但 kafka 的表定义仍然要写。

你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。

yidan zhao  于2021年11月1日周一 下午3:05写道:

> 如题,我看了官方文档,定义好kafka和hive表。
> 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
>
> SET table.sql-dialect=hive;CREATE TABLE hive_table (
>   user_id STRING,
>   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> parquet TBLPROPERTIES (
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='1 h',
>   'sink.partition-commit.policy.kind'='metastore,success-file');
> SET table.sql-dialect=default;CREATE TABLE kafka_table (
>   user_id STRING,
>   order_amount DOUBLE,
>   log_ts TIMESTAMP(3),
>   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> watermark on TIMESTAMP column) WITH (...);
>
>
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
>
> 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
>
>
> 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
>


Re: Re: Re: 公司数据密文,实现group by和join

2021-11-01 文章 godfrey he
上传的图片没法显示,通过图床工具或纯文本方式重新发一遍

lyh1067341434  于2021年11月1日周一 上午10:42写道:

> 您好!
>
> 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组;
> 为了更清楚表达,下面为图示:
>
> 谢谢您!
>
>
>
>
>
>
>
> 在 2021-10-29 10:49:35,"Caizhi Weng"  写道:
> >Hi!
> >
> >你是不是想写这样的 SQL:
> >
> >SELECT id, sum(price) AS total_price FROM (
> >  SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 ON
> >decrypt_udf(T1.id, T2.id) = 1
> >) GROUP BY id
> >
> >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。
> >
> >lyh1067341434  于2021年10月29日周五 上午9:41写道:
> >
> >> 您好!
> >>
> >>
> >>   感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了;
> >>
> >>
> >> 比如 有一张表 a, 表a有
> >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA");
> >>
> >>
> >> 如果我想求 不同id组的price之和:
> >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确;
> >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确;
> >>
> >>
> >>
> >>
> >> 问题:
> >>
> >>
> >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确;
> >>我看到api只能指定分组的key是哪一个;
> >>
> >>
> >> 谢谢您!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-10-28 11:09:26,"Caizhi Weng"  写道:
> >> >Hi!
> >> >
> >> >没太明白你的需求。你的需求是不是
> >> >
> >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。
> >> >
> >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。
> >> >
> >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。
> >> >
> >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
> >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。
> >> >
> >> >[1]
> >> >
> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/
> >> >
> >> >lyh1067341...@163.com  于2021年10月27日周三 下午5:20写道:
> >> >
> >> >> 您好:
> >> >>
> >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
> >> >> by,对于使用spark和flink算子不知道如何实现。
> >> >>
> >> >> 问题:
> >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
> >> >>
> >> >> 谢谢您。
> >> >>
> >> >>
> >> >>
> >> >> 发自 网易邮箱大师
> >>
>
>
>
>
>


关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 yidan zhao
如题,我看了官方文档,定义好kafka和hive表。
写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。

SET table.sql-dialect=hive;CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file');
SET table.sql-dialect=default;CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
watermark on TIMESTAMP column) WITH (...);


如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。

其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。


此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。


Re: standalone集群重启后自动回复任务,任务的jobmaster如果失败会导致JM进程失败

2021-11-01 文章 yidan zhao
补充个更完整的日志:

2021-11-01 14:15:15,849 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:181)
- Recovered JobGraph(jobId: dfced635fd8c224222a9cbaaf1c5054f).
2021-11-01 14:15:15,849 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
- Successfully recovered 1 persisted job graphs.
2021-11-01 14:15:15,856 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:232)
- Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2021-11-01 14:15:22,867 INFO  [30-flink-akka.actor.default-dispatcher-3]
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.start(DefaultLeaderElectionService.java:93)
- Starting DefaultLeaderElectionService with
ZooKeeperLeaderElectionDriver{leaderPath='/leader/dfced635fd8c224222a9cbaaf1c5054f/job_manager_lock'}.

2021-11-01 14:15:22,892 ERROR [30-flink-akka.actor.default-dispatcher-3]
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.onFatalError(ClusterEntrypoint.java:454)
- Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
dfced635fd8c224222a9cbaaf1c5054f failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:418)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
~[?:1.8.0_152]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
~[?:1.8.0_152]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
~[?:1.8.0_152]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]
at

standalone集群重启后自动回复任务,任务的jobmaster如果失败会导致JM进程失败

2021-11-01 文章 yidan zhao
如题,这个问题之前遇到过,当时我email问的是集群不断重启。
这次也是这个问题,集群不断重启,但分析下原因如题。看日志片段如下:

2021-11-01 14:05:36,954 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:181)
- Recovered JobGraph(jobId: dfced635fd8c224222a9cbaaf1c5054f).
2021-11-01 14:05:36,954 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
- Successfully recovered 1 persisted job graphs.
2021-11-01 14:05:36,962 INFO  [78-cluster-io-thread-1]
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:232)
- Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2021-11-01 14:05:44,810 INFO  [94-flink-akka.actor.default-dispatcher-30]
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.start(DefaultLeaderElectionService.java:93)
- Starting DefaultLeaderElectionService with
ZooKeeperLeaderElectionDriver{leaderPath='/leader/dfced635fd8c224222a9cbaaf1c5054f/job_manager_lock'}.
2021-11-01 14:05:44,836 ERROR [94-flink-akka.actor.default-dispatcher-30]
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.onFatalError(ClusterEntrypoint.java:454)
- Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
dfced635fd8c224222a9cbaaf1c5054f failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
~[flink-dist_2.11-1.13.0.1-sc-SNAPSHOT.jar:1.13.0.1-sc-SNAPSHOT]

如上,恢复了jobgraph,开启 leader 选举(看起来像是jobmaster的leader选举服务),然后jobmaster 挂了。


如上,我想知道为什么jobmaster挂了就会导致 standalone JM 进程失败呢?
JM进程是所有任务公用,即使启动后之前的某个job无法恢复,也没必要因此就挂掉吧。