你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没 start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。
Best, tison. 郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 下午2:27写道: > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试 > > ________________________________ > zjfpla...@hotmail.com > > 发件人: 郑 洁锋<mailto:zjfpla...@hotmail.com> > 发送时间: 2020-01-16 14:24 > 收件人: user-zh<mailto:user-zh@flink.apache.org> > 主题: Re: Re: MiniCluster问题 > 这是完整的到启动的代码 > > public class ClusterClientFactory { > > public static ClusterClient createClusterClient(Options > launcherOptions) throws Exception { > String mode = launcherOptions.getMode(); > if(mode.equals(ClusterMode.standalone.name())) { > return createStandaloneClient(launcherOptions); > } else if(mode.equals(ClusterMode.yarn.name())) { > return createYarnClient(launcherOptions,mode); > } > throw new IllegalArgumentException("Unsupported cluster client > type: "); > } > > public static ClusterClient createStandaloneClient(Options > launcherOptions) throws Exception { > String flinkConfDir = launcherOptions.getFlinkconf(); > Configuration config = > GlobalConfiguration.loadConfiguration(flinkConfDir); > MiniClusterConfiguration.Builder configBuilder = new > MiniClusterConfiguration.Builder(); > configBuilder.setConfiguration(config); > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster); > LeaderConnectionInfo connectionInfo = > clusterClient.getClusterConnectionInfo(); > InetSocketAddress address = > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); > config.setString(JobManagerOptions.ADDRESS, > address.getAddress().getHostName()); > config.setInteger(JobManagerOptions.PORT, address.getPort()); > clusterClient.setDetached(true); > return clusterClient; > } > > > 启动类中: > > ClusterClient clusterClient = > ClusterClientFactory.createClusterClient(launcherOptions); > clusterClient.run(program, 1); > clusterClient.shutdown(); > > ________________________________ > zjfpla...@hotmail.com > > 发件人: tison<mailto:wander4...@gmail.com> > 发送时间: 2020-01-16 13:31 > 收件人: user-zh<mailto:user-zh@flink.apache.org> > 主题: Re: Re: MiniCluster问题 > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > > miniCluster.start(); > > > MiniClusterClient clusterClient = new MiniClusterClient(config, > miniCluster) > ; > > Best, > tison. > > > tison <wander4...@gmail.com> 于2020年1月16日周四 下午1:30写道: > > > 跟集群无关 > > Best, > > tison. > > > > > > tison <wander4...@gmail.com> 于2020年1月16日周四 下午1:30写道: > > > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖 > >> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊 > >> > >> Best, > >> tison. > >> > >> > >> 郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 下午1:18写道: > >> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容? > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群 > >>> > >>> > >>> ________________________________ > >>> zjfpla...@hotmail.com > >>> > >>> 发件人: tison<mailto:wander4...@gmail.com> > >>> 发送时间: 2020-01-16 12:39 > >>> 收件人: user-zh<mailto:user-zh@flink.apache.org> > >>> 主题: Re: MiniCluster问题 > >>> 你 MiniCluster 要 start 啊(x > >>> > >>> Best, > >>> tison. > >>> > >>> > >>> 郑 洁锋 <zjfpla...@hotmail.com> 于2020年1月16日周四 上午11:38写道: > >>> > >>> > MiniCluster代码执行过程中报错: > >>> > > >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation > >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > >>> further details. > >>> > Exception in thread "main" java.lang.IllegalStateException: > >>> MiniCluster is not yet running. > >>> > at > >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > >>> > at > >>> > org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223) > >>> > at > >>> > org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82) > >>> > at > >>> > com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69) > >>> > at > >>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99) > >>> > > >>> > 报错段代码如下: > >>> > > >>> > Configuration config = > >>> GlobalConfiguration.loadConfiguration(flinkConfDir); > >>> > MiniClusterConfiguration.Builder configBuilder = new > >>> MiniClusterConfiguration.Builder(); > >>> > configBuilder.setConfiguration(config); > >>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build()); > >>> > MiniClusterClient clusterClient = new MiniClusterClient(config, > >>> miniCluster); > >>> > > >>> > 其中flinkConfDir为/opt/flink/conf > >>> > > >>> > > >>> > flink standalone HA集群信息如下: > >>> > ------------------------------ > >>> > zjfpla...@hotmail.com > >>> > > >>> > > >>> > > >>> > >> >