请问如何将已有字段设置为rowtime属性

2019-10-28 文章
各位好,我想使用kafka消息中的某个字段作为rowtime属性,遇到了以下问题,使用flink版本为1.9.1。
以下是我尝试的两种用法,都会报错。请问大家有没有遇到过类似的问题,怎么解决的,谢谢!
代码一:

tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-dept-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("dept_id", Types.INT)
.field("dept_name", Types.STRING)
.field("crt_time", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("crt_time")
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
)
.field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");

报错:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field 'crt_time' could not be resolved by the field mapping.
  at 
org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
   at 
org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
   at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
   at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
   at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
   at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
   at 
org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
   at 
org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
   at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
   at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
   at com.sean.TimeWindowExample.main(TimeWindowExample.java:47)


代码二:

tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-dept-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("dept_id", Types.INT)
.field("dept_name", Types.STRING)
.field("crt_time", Types.SQL_TIMESTAMP)
.field("row_time", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("crt_time")
.watermarksFromStrategy(new BoundedOutOfOrderTimestamps(1000))
)
.field("proc_time", Types.SQL_TIMESTAMP).proctime()
).inAppendMode()
.registerTableSource("dept");

报错:
Exception in thread "main" org.apache.flink.table.api.TableException: 
findAndCreateTableSource failed.
   at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
   at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
   at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
   at com.sean.TimeWindowExample.main(TimeWindowExample.java:48)
Caused by: org.apache.flink.table.api.TableException: Field names must be 
unique.
List of duplicate fields: [crt_time]
List of all fields: [dept_id, dept_name, crt_time, crt_time]
   at org.apache.flink.table.api.TableSchema.(TableSchema.java:94)
   at org.apache.flink.table.api.TableSchema.(TableSchema.java:49)
   at 
org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:352)
   at 
org.apache.flink.table.factories.TableFormatFactoryBase.deriveSchema(TableFormatFactoryBase.java:163)
   at 
org.apache.flink.formats.json.JsonRowFormatFactory.createTypeInformation(JsonRowFormatFactory.java:88)
   at 
org.apache.flink.formats.json.JsonRowFormatFactory.createDeserializationSchema(JsonRowFormatFactory.java:62)
   at 

在json schema中如何定义数组字段

2019-09-19 文章
我是用的是flink-1.9版本,现在想消费带数组字段的json数据,类似{“field1:” “aaa”, “field2”: [“bbb, “ccc”]} 
这种数据格式。


注册schema的时候我使用了field("field2", 
Types.OBJECT_ARRAY(Types.STRING))这种方式。EnvironmentSetting指定为flink-planner和blink-planner都有问题。


在flink-planner下,会报下面的错误,我跟了下代码,发现是StringArraySerializer.copy方法接收的是String[],传过去的却是Object[]。
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.String;
   at 
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
   at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
   at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
   at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
   ... 13 more

在blink-planner下,程序一启动就会报下面的错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo) of table field 'field2' does not match with 
type BasicArrayTypeInfo of the field ' field2' of the TableSource 
return type.
   at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
   at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
   at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

请问这个问题有没有人知道怎么解决,定义数组字段有没有其他的方法呢?

发送自 Windows 10 版邮件应用



在where条件中使用汉字导致查询出的字段出现unicode编码

2019-09-17 文章
我使用的是flink 
1.9版本,在sql中将where条件的一个字段传入了汉字,返回的字段值却是unicode编码。如果不按照汉字做条件,返回的字段值则是汉字。请问有没有人遇到过这个问题?

测试代码:

tEnv.connect(
new Kafka()
.version("universal")
.topic("flink-test-topic-1")
.startFromGroupOffsets()
.property("bootstrap.servers", "192.168.129.101:9192")
.property("group.id", "flink-test-consumer-group")
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("TI", Types.STRING)
.field("EV", Types.STRING)
.field("CS_HOST", Types.STRING)
.field("DCS_ID", Types.STRING)
.field("complex_row",
Types.ROW_NAMED(new String[]{"first_level_row", 
"first_level_int"},
Types.ROW_NAMED(new String[]{"second_level_str1", 
"second_level_str2"}, Types.STRING, Types.STRING),
Types.INT))
.field("proc", Types.SQL_TIMESTAMP).proctime()
).inAppendMode().registerTableSource("kafka_src");

Table table1 = tEnv.sqlQuery("select * from kafka_src where TI = '会话登录'");
Table table2 = tEnv.sqlQuery("select * from kafka_src where EV = 'view'");

tEnv.toAppendStream(table1, Row.class).print();
tEnv.toAppendStream(table2, Row.class).print();

输出结果:
\u4F1A\u8BDD\u767B\u5F55,view,-,hCOsDjIKi8pcW0VmFASlY4bTMw7yZG,aaa,bbb,100,2019-09-17T09:42:43.731
会话登录,view,-,hCOsDjIKi8pcW0VmFASlY4bTMw7yZG,aaa,bbb,100,2019-09-17T09:42:43.731

发送自 Windows 10 版邮件应用



答复: flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-11 文章
感谢大佬解答,对于处理窗口迟到数据的话是不是可以通过setIdleStateRetentionTime方法来设置?



发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用




发件人: Benchao Li 
发送时间: Wednesday, September 11, 2019 6:38:44 PM
收件人: user-zh@flink.apache.org 
主题: Re: flink sql中怎么表达窗口的提前触发或延迟触发

目前社区的1.9版本的blink-planner在parser层面还不支持,可以通过全局config来配置:
table.exec.emit.early-fire.enabled
table.exec.emit.early-fire.delay

可以尝试一下。

苏 欣  于2019年9月11日周三 上午11:45写道:

> Blink文档中有介绍到EMIT Strategy,可以用WITH DELAY '1' MINUTE BEFORE WATERMARK或者EMIT
> WITHOUT DELAY AFTER WATERMARK等类似的语法来控制窗口触发。
> 但是我使用这种语法作业运行就会报SQL解析错误,请问有没有办法可以在sql中实现控制窗口触发的操作?
> Table result = tEnv.sqlQuery("select " +
> "count(*) " +
> "from dept group by tumble(crt_time, INTERVAL '10' SECOND)
> WITH DELAY '1' MINUTE BEFORE WATERMARK");
> 报错:
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> 
> ERR_ID:
>  SQL-00120001
> CAUSE:
>  SQL parse failed:
>  Encountered "WITH" at line 1, column 75.
>  Was expecting one of:
>  
>  "ORDER" ...
>  "LIMIT" ...
>  "OFFSET" ...
>  "FETCH" ...
>  "," ...
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-10 文章
Blink文档中有介绍到EMIT Strategy,可以用WITH DELAY '1' MINUTE BEFORE WATERMARK或者EMIT 
WITHOUT DELAY AFTER WATERMARK等类似的语法来控制窗口触发。
但是我使用这种语法作业运行就会报SQL解析错误,请问有没有办法可以在sql中实现控制窗口触发的操作?
Table result = tEnv.sqlQuery("select " +
"count(*) " +
"from dept group by tumble(crt_time, INTERVAL '10' SECOND) WITH 
DELAY '1' MINUTE BEFORE WATERMARK");
报错:
Exception in thread "main" org.apache.flink.table.api.SqlParserException:

ERR_ID:
 SQL-00120001
CAUSE:
 SQL parse failed:
 Encountered "WITH" at line 1, column 75.
 Was expecting one of:
 
 "ORDER" ...
 "LIMIT" ...
 "OFFSET" ...
 "FETCH" ...
 "," ...

发送自 Windows 10 版邮件应用



回复: flink-1.9 打包问题

2019-08-23 文章
settings.xml加上这个镜像试试

confluent
confluent
confluent
http://packages.confluent.io/maven



sean...@live.com

发件人: Jimmy Wong
发送时间: 2019-08-23 16:56
收件人: user-zh
主题: flink-1.9 打包问题
Hi,大家好,我用阿里的 settings.xml 打包 flink-1.9 的时候,使用的命令如下
> mvn clean install -DskipTests


但是,在打包结束是报错如下:
> [ERROR] Failed to execute goal on project flink-avro-confluent-registry: 
> Could not resolve dependencies for project 
> org.apache.flink:flink-avro-confluent-registry:jar:1.9-SNAPSHOT: Failure to 
> find io.confluent:kafka-schema-registry-client:jar:3.3.1 in 
> http://maven.aliyun.com/nexus/content/groups/public was cached in the local 
> repository, resolution will not be reattempted until the update interval of 
> nexus-aliyun has elapsed or updates are forced -> [Help 1]


看上去是阿里云的 Maven 仓库 http://maven.aliyun.com/nexus 中没有 
io.confluent:kafka-schema-registry-client:jar:3.3.1。


有同学知道具体的原因麽?


谢谢


答复: flink1.10版本连接hive报错

2019-08-12 文章
感谢各位大佬提供思路,我增加了lzo的jar后不再报这种错而且能取到hive表的数据了。

我以为在flink-shaded-hadoop-2-uber里面包含了所有hadoop相关的包所以没去考虑缺包的问题

附下缺少的pom内容:


   org.apache.hadoop
   hadoop-lzo
   0.4.13






发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用




发件人: zhisheng 
发送时间: Saturday, August 10, 2019 5:58:02 PM
收件人: user-zh 
主题: Re: flink1.10版本连接hive报错

hi 苏欣:
建议先检查一下最后打的 jar 包里面是否包含了  com.hadoop.compression.lzo.LzoCodec 和
com.hadoop.compression.lzo.LzoCodec

苏 欣  于2019年8月9日周五 下午5:41写道:

> 使用flink版本为1.10-snapshot,连接hive版本为1.1.0-cdh5.4.7,大数据集群有kerberos认证。
>
> 我是用1.2.1的方式连接hive的。hiveCatalog可以取到表结构,但在启动作业的时候报错,Standalone模式和yarn模式都报同样的错。
> 请问有人遇到过这种问题吗?
>
> 报错信息如下:
> 
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 3f3033f7076c332529f3ac8250713889)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:243)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at com.sean.HiveCatalogExample.main(HiveCatalogExample.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:211)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.

blink访问带kerberos认证kafka的问题

2019-05-24 文章
大家好,我使用blink集群访问带认证的kafka,按照文档上的方式配置flink-conf.yaml文件,配置如下:
 security.kerberos.login.use-ticket-cache: false
 security.kerberos.login.keytab: /home/kafka/kafka.keytab
 security.kerberos.login.principal: kafka/slave2@MYCDH
 security.kerberos.login.contexts: Client,KafkaClient

blink集群总共三台节点,把配置文件依次覆盖后,启动集群出现只能启动standalone主节点而启动不了taskManager的情况。
票据每台机器上都有,去掉上述配置后集群可正常启动。
请问有人遇到过这种情况吗?

sean...@live.com


附件好像发不过去,补充部分日志//回复: 回复: blink提交yarn卡在一直重复分配container

2019-04-08 文章
e/.flink/application_1554366508934_0084/taskmanager-conf.yaml
 (the corresponding local path: 
file:/home/yarn/nm/usercache/hive/appcache/application_1554366508934_0084/container_1554366508934_0084_01_01/taskmanager-conf.yaml).
 Visibility: APPLICATION.
2019-04-09 09:58:30.661 [pool-1-thread-6] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Creating container launch 
context for TaskManagers
2019-04-09 09:58:30.661 [pool-1-thread-7] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Creating container launch 
context for TaskManagers
2019-04-09 09:58:30.661 [pool-1-thread-3] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Creating container launch 
context for TaskManagers
2019-04-09 09:58:30.662 [pool-1-thread-6] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Starting TaskManagers
2019-04-09 09:58:30.662 [pool-1-thread-7] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Starting TaskManagers
2019-04-09 09:58:30.662 [pool-1-thread-3] INFO  
org.apache.flink.yarn.YarnSessionResourceManager  - Starting TaskManagers


sean...@live.com

发件人: 苏 欣<mailto:sean...@live.com>
发送时间: 2019-04-09 10:36
收件人: user-zh<mailto:user-zh@flink.apache.org>
主题: 回复: 答复: blink提交yarn卡在一直重复分配container



sean...@live.com

发件人: 苏 欣<mailto:sean...@live.com>
发送时间: 2019-04-09 10:30
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: 答复: blink提交yarn卡在一直重复分配container
不好意思,已补充yarn的日志文件。

出现问题的原因我已经找到了,在配置flink-conf.yaml中的下面三项后,会出现分配不了资源的问题
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/hive.keytab
security.kerberos.login.principal: hive/cdh129135@MYCDH
如果在客户机使用kinit命令后再提交,yarn资源可以正常分配。
现在我有几个问题请教大佬们:

1、 提交作业到带有kerberos认证的yarn,除了kinit方式之外还有其他方式吗,为什么读配置文件中的票据会出现code 31?

2、  taskmanager.cpu.core与slot数量在yarn上面他们是相等的吗?有没有一个core分配多个slot的情况?


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Zili Chen<mailto:wander4...@gmail.com>
发送时间: 2019年4月8日 19:29
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: blink提交yarn卡在一直重复分配container

你好,apache 的邮件列表不支持内嵌图片,请以附件或链接方式引用。

Best,
tison.


苏 欣  于2019年4月8日周一 上午10:17写道:

> 我以per-job方式提交了一个作业到yarn上面,发现会出现不断重复分配container的现象。
>
> 现象为从yarn的web ui上看一瞬间tm的container分配成功了,但是立刻变为只剩一个jm的container,接着会继续分配tm的
> container。不断的重复这个过程直到作业调度不到资源而失败。
>
> 我查了一下exit code没找到31代表是什么意思,有没有大佬帮忙分析下,非常感谢!
>
>
>
> 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
>



回复: 答复: blink提交yarn卡在一直重复分配container

2019-04-08 文章



sean...@live.com

发件人: 苏 欣<mailto:sean...@live.com>
发送时间: 2019-04-09 10:30
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: 答复: blink提交yarn卡在一直重复分配container
不好意思,已补充yarn的日志文件。

出现问题的原因我已经找到了,在配置flink-conf.yaml中的下面三项后,会出现分配不了资源的问题
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/hive.keytab
security.kerberos.login.principal: hive/cdh129135@MYCDH
如果在客户机使用kinit命令后再提交,yarn资源可以正常分配。
现在我有几个问题请教大佬们:

1、 提交作业到带有kerberos认证的yarn,除了kinit方式之外还有其他方式吗,为什么读配置文件中的票据会出现code 31?

2、  taskmanager.cpu.core与slot数量在yarn上面他们是相等的吗?有没有一个core分配多个slot的情况?


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Zili Chen<mailto:wander4...@gmail.com>
发送时间: 2019年4月8日 19:29
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: blink提交yarn卡在一直重复分配container

你好,apache 的邮件列表不支持内嵌图片,请以附件或链接方式引用。

Best,
tison.


苏 欣  于2019年4月8日周一 上午10:17写道:

> 我以per-job方式提交了一个作业到yarn上面,发现会出现不断重复分配container的现象。
>
> 现象为从yarn的web ui上看一瞬间tm的container分配成功了,但是立刻变为只剩一个jm的container,接着会继续分配tm的
> container。不断的重复这个过程直到作业调度不到资源而失败。
>
> 我查了一下exit code没找到31代表是什么意思,有没有大佬帮忙分析下,非常感谢!
>
>
>
> 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
>



答复: blink提交yarn卡在一直重复分配container

2019-04-08 文章
不好意思,已补充yarn的日志文件。

出现问题的原因我已经找到了,在配置flink-conf.yaml中的下面三项后,会出现分配不了资源的问题
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/hive.keytab
security.kerberos.login.principal: hive/cdh129135@MYCDH
如果在客户机使用kinit命令后再提交,yarn资源可以正常分配。
现在我有几个问题请教大佬们:

1、 提交作业到带有kerberos认证的yarn,除了kinit方式之外还有其他方式吗,为什么读配置文件中的票据会出现code 31?

2、  taskmanager.cpu.core与slot数量在yarn上面他们是相等的吗?有没有一个core分配多个slot的情况?


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Zili Chen<mailto:wander4...@gmail.com>
发送时间: 2019年4月8日 19:29
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: blink提交yarn卡在一直重复分配container

你好,apache 的邮件列表不支持内嵌图片,请以附件或链接方式引用。

Best,
tison.


苏 欣  于2019年4月8日周一 上午10:17写道:

> 我以per-job方式提交了一个作业到yarn上面,发现会出现不断重复分配container的现象。
>
> 现象为从yarn的web ui上看一瞬间tm的container分配成功了,但是立刻变为只剩一个jm的container,接着会继续分配tm的
> container。不断的重复这个过程直到作业调度不到资源而失败。
>
> 我查了一下exit code没找到31代表是什么意思,有没有大佬帮忙分析下,非常感谢!
>
>
>
> 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
>



blink提交yarn卡在一直重复分配container

2019-04-07 文章
我以per-job方式提交了一个作业到yarn上面,发现会出现不断重复分配container的现象。
现象为从yarn的web 
ui上看一瞬间tm的container分配成功了,但是立刻变为只剩一个jm的container,接着会继续分配tm的container。不断的重复这个过程直到作业调度不到资源而失败。
我查了一下exit code没找到31代表是什么意思,有没有大佬帮忙分析下,非常感谢!
[cid:image003.png@01D4EDF4.496DE910]

发送自 Windows 10 版邮件应用



请教大佬们,blink提交yarn集群的问题

2019-04-02 文章
我在fink-conf.yaml文件中配置了principal和keytab,可以提交到带有kerberos认证的yarn集群中,现在我有两个问题:
1.同一客户机切换到不同的yarn集群时,提交作业之前需要改变HADOOP_CONF_DIR,krb5.conf和fink-conf.yaml的配置。这样做有点不太方便,也不太好处理同时提交的问题。
blink目前能否通过提交命令传参的方式来切换票据,或者有没有什么使用上的建议呢?
2.我看到文档上说,缓存票据目前只支持在yarn上的独立集群,这句话的意思是指目前缓存票据只能用在flink yarn session模式中吗?

发送自 Windows 10 版邮件应用



答复: blink开源版本维表关联时开启缓存方式

2019-03-29 文章
感谢大佬解答,我尝试了一下已经基本实现了缓存。期待能早日用上flink原生的维表缓存功能



发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用




发件人: Kurt Young 
发送时间: Friday, March 29, 2019 5:33:57 PM
收件人: user-zh@flink.apache.org
主题: Re: blink开源版本维表关联时开启缓存方式

当时没有想清楚如何把Cache当成一个public的接口向外提供,它更像是一些实现上的特定优化。
后续我们在flink master上实现维表join的时候,会把这个问题考虑进去。

Best,
Kurt


On Fri, Mar 29, 2019 at 5:09 PM moxian  wrote:

> 这么好的一个优化,为啥被拿掉了呢?
>
> Kurt Young  于2019年3月29日周五 上午9:39写道:
>
> > Hi,
> >
> > Blink开源的时候把Cache的实现暂时拿掉了,你可以根据自己的需要自己实现一个cache。
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 27, 2019 at 4:44 PM 苏 欣  wrote:
> >
> > > 我在ppt里面看到这些内容,但是在开源的blink里面没有找到相关的配置,请问各位老师应该如何开启缓存策略?
> > >
> > >
> > >
> > > 发送自 Windows 10 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>应用
> > >
> > >
> > >
> >
>


blink开源版本维表关联时开启缓存方式

2019-03-27 文章
我在ppt里面看到这些内容,但是在开源的blink里面没有找到相关的配置,请问各位老师应该如何开启缓存策略?
[cid:image001.png@01D4E4BC.02091040]

发送自 Windows 10 版邮件应用