Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
我的怀疑点还是多线程引起的。
你能具体描述一下你们gateway的行为吗? 是一个web server?

另外,你可以在table env执行query前加上
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。

wind.fly@outlook.com  于2020年7月28日周二 上午11:02写道:

> 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 9:58
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 你们是否在多线程环境下使用 TableEnvironment ?
> TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
>
> godfrey he  于2020年7月28日周二 上午9:55写道:
>
> > hi 能给出详细的schema信息吗?
> >
> > wind.fly@outlook.com  于2020年7月27日周一
> > 下午7:02写道:
> >
> >> 补充一下,执行的sql如下:
> >>
> >> select order_no, order_time from
> >> x.ods.ods_binlog_test_trip_create_t_order_1
> >>
> >> 
> >> 发件人: wind.fly@outlook.com 
> >> 发送时间: 2020年7月27日 18:49
> >> 收件人: user-zh@flink.apache.org 
> >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> >>
> >> Hi,all:
> >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> >> Caused by: java.lang.NullPointerException
> >>   at java.util.Objects.requireNonNull(Objects.java:203)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> >>   at
> >>
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> >>   at
> >>
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> >>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> >>   at
> >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
> >>   at
> >>
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
> >>   at
> >>
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
> >>   at
> >> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
> >>   at
> >> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
> >>   at
> >>
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
> >>   at
> >>
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
> >>   at
> >>
> 

Re: flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 Thread godfrey he
你的包是完整的flink-1.11.1的包吗?
例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway
?

shimin huang  于2020年7月28日周二 上午11:21写道:

> Hi,all:
>   本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.program.PackagedProgram
> .invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.
> 11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:
> 149) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
> .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) [?:1.8.0_242]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511) [?:1.8.0_242]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_242
> ]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149) [?:1.8.0_242]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624) [?:1.8.0_242]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> Caused by: java.lang.IllegalArgumentException: Job client must be a
> CoordinationRequestGateway. This is a bug.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:
> 139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
> .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.api.operators.collect.
> CollectResultIterator.setJobClient(CollectResultIterator.java:84)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.table.planner.sinks.SelectTableSinkBase
> .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1
> .jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at 

Re: How to stream CSV from S3?

2020-07-27 Thread Jingsong Li
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support
partitioned table. So the only way is specific the partition/bucket path,
and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports
partitioned table, complete support partition semantics.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith  wrote:

> Hi, using Flink 1.10
>
> 1- How do we go about reading CSV files that are copied to s3 buckets?
> 2- Is there a source that can tail S3 and start reading a CSV when it is
> copied to S3?
> 3- Is that part of the table APIs?
>


-- 
Best, Jingsong Lee


Flink-1.10 on yarn Taskmanager启动参数问题

2020-07-27 Thread Roc Marshal
Hi, all.


 请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗?
 基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。
 




谢谢。






Best,
Roc Marshal.

improve the performance of flink sql job which lookup 40+ table

2020-07-27 Thread snack white
HI:
  My  flink  version is 1.10  use per-job mode , my   sql like 

```
select
  column1, t2.xx2, t3.xx3,t4.xx4
  …  t40.xx40
from
  main_table 
  left join lookup_1 FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.xx= t2.xx
  left join lookup_2 FOR SYSTEM_TIME AS OF t1.proc_time AS t3 on t1.xx= t3.xx
  left join lookup_3 FOR SYSTEM_TIME AS OF t1.proc_time AS t4 on t1.xx= t4.xx
  left join lookup_4 FOR SYSTEM_TIME AS OF t1.proc_time AS t5 on t1.xx= t5.xx
  left join lookup_5 FOR SYSTEM_TIME AS OF t1.proc_time AS t6 on t1.xx= t6.xx
  left join lookup_6 FOR SYSTEM_TIME AS OF t1.proc_time AS t7 on t1.xx= t7.xx
...

  left join lookup_40 FOR SYSTEM_TIME AS OF t1.proc_time AS t40 on t1.xx= t40.xx
```

I have developed the async lookup feature , but that is not enough, maybe the 
current look up table is serial not parallelism ?  

Now I need help about how can I  improve the performance of my sql job .

Best 
White 




flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 Thread shimin huang
Hi,all:
  本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.program.PackagedProgram
.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.
11-1.11.1.jar:1.11.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:
149) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.deployment.application.
DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.deployment.application.
DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
.lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1
.jar:1.11.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(
CompletableFuture.java:1604) [?:1.8.0_242]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
511) [?:1.8.0_242]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_242
]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149) [?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624) [?:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
0_242]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
.java:62) ~[?:1.8.0_242]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
... 13 more
Caused by: java.lang.IllegalArgumentException: Job client must be a
CoordinationRequestGateway. This is a bug.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:
139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
.setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar:
1.11.1]
at org.apache.flink.streaming.api.operators.collect.
CollectResultIterator.setJobClient(CollectResultIterator.java:84)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.table.planner.sinks.SelectTableSinkBase
.setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1
.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
0_242]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
.java:62) ~[?:1.8.0_242]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]


* 核心错误
`Job client must be a CoordinationRequestGateway. This is a bug.`
请问这是一个Bug吗?


答复: Flink Session TM Logs

2020-07-27 Thread 范超
Hi Rechard
Maybe you can try using cli
“yarn logs –applicationId yourYarnAppId”
to check your logs or just to find your app logs in the yarn webui

发件人: Richard Moorhead [mailto:richard.moorh...@gmail.com]
发送时间: 2020年7月24日 星期五 23:55
收件人: user 
主题: Flink Session TM Logs

When running a flink session on YARN, task manager logs for a job are not 
available after completion. How do we locate these logs?



回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread wind.fly....@outlook.com
不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment

发件人: godfrey he 
发送时间: 2020年7月28日 9:58
收件人: user-zh 
主题: Re: flink1.11.0 执行sqlQuery时报NullPointException

你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。

godfrey he  于2020年7月28日周二 上午9:55写道:

> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com  于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>> x.ods.ods_binlog_test_trip_create_t_order_1
>>
>> 
>> 发件人: wind.fly@outlook.com 
>> 发送时间: 2020年7月27日 18:49
>> 收件人: user-zh@flink.apache.org 
>> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>>
>> Hi,all:
>>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
>> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
>> Caused by: java.lang.NullPointerException
>>   at java.util.Objects.requireNonNull(Objects.java:203)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>>   at
>> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>>   at
>> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>>   at
>> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>>   at
>> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>>   at
>> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>>   at
>> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>>   at
>> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>>   at
>> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>>   at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>>   at
>> 

回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread wind.fly....@outlook.com
schema信息如下:
CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
  _type STRING,
  order_no STRING,
  order_time STRING,
  dt as TO_TIMESTAMP(order_time),
  proctime as PROCTIME(),
  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.properties.bootstrap.servers' = '*',
  'connector.properties.zookeeper.connect' = '*',
  'connector.version' = 'universal',
  'format.type' = 'json',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'group-offsets',
  'connector.topic' = 'ods-test_trip_create-t_order'
)

发件人: godfrey he 
发送时间: 2020年7月28日 9:55
收件人: user-zh 
主题: Re: flink1.11.0 执行sqlQuery时报NullPointException

hi 能给出详细的schema信息吗?

wind.fly@outlook.com  于2020年7月27日周一 下午7:02写道:

> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
> 
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org 
> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>
> Hi,all:
>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>   at
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>   at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>   at
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>   at
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>   at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>   at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>   at
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>   at
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>   at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>   at
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>   at
> 

Re: flink1.11.1启动问题

2020-07-27 Thread Xintong Song
建议确认一下 Yarn 的配置 “yarn.scheduler.minimum-allocation-mb” 在 Yarn RM 和 Flink JM
这两台机器上是否一致。

Yarn 会对 container request 做归一化。例如你请求的 TM container 是 1728m
(taskmanager.memory.process.size) ,如果 minimum-allocation-mb 是 1024m,那么实际得到的
container 大小必须是 minimum-allocation-mb 的整数倍,也就是 2048m。Flink 会去获取 Yarn 的配置,计算
container request 实际分到的 container 应该多大,并对分到的 container 进行检查。现在看 JM 日志,分下来的
container 并没有通过这个检查,造成 Flink 认为 container 规格不匹配。这里最可能的原因是 Flink 拿到的
minimum-allocation-mb 和 Yarn RM 实际使用的不一致。

Thank you~

Xintong Song



On Mon, Jul 27, 2020 at 7:42 PM 酷酷的浑蛋  wrote:

>
> 首先,flink1.9提交到yarn集群是没有问题的,同等的配置提交flink1.11.1到yarn集群就报下面的错误
> 2020-07-27 17:08:14,661 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> 
> 2020-07-27 17:08:14,665 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting YarnJobClusterEntrypoint (Version: 1.11.1, Scala: 2.11,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
> 2020-07-27 17:08:14,665 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS
> current user: hadoop
> 2020-07-27 17:08:15,417 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Current
> Hadoop/Kerberos user: wangty
> 2020-07-27 17:08:15,418 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM:
> Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.191-b12
> 2020-07-27 17:08:15,418 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Maximum
> heap size: 429 MiBytes
> 2020-07-27 17:08:15,418 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  JAVA_HOME: /usr/local/jdk/
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Hadoop
> version: 2.7.7
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM
> Options:
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Xmx469762048
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Xms469762048
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -XX:MaxMetaspaceSize=268435456
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog.file=/data/emr/yarn/logs/application_1568724479991_18850539/container_e25_1568724479991_18850539_01_01/jobmanager.log
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog4j.configuration=file:log4j.properties
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog4j.configurationFile=file:log4j.properties
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Program
> Arguments: (none)
> 2020-07-27 17:08:15,419 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Classpath:
> 

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。

godfrey he  于2020年7月28日周二 上午9:55写道:

> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com  于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>> x.ods.ods_binlog_test_trip_create_t_order_1
>>
>> 
>> 发件人: wind.fly@outlook.com 
>> 发送时间: 2020年7月27日 18:49
>> 收件人: user-zh@flink.apache.org 
>> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>>
>> Hi,all:
>>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
>> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
>> Caused by: java.lang.NullPointerException
>>   at java.util.Objects.requireNonNull(Objects.java:203)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>>   at
>> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>>   at
>> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>>   at
>> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>>   at
>> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>>   at
>> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>>   at
>> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>>   at
>> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>>   at
>> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>>   at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>>   at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>>   at
>> 

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread godfrey he
hi 能给出详细的schema信息吗?

wind.fly@outlook.com  于2020年7月27日周一 下午7:02写道:

> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
> 
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org 
> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>
> Hi,all:
>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>   at
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>   at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>   at
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>   at
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>   at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>   at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>   at
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>   at
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>   at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>   at
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>   at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>   at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at
> 

Re:Re: 解析kafka的mysql binlog问题

2020-07-27 Thread air23
你好 测试代码如下


private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
public static void main(String[] args) throws Exception {


//bink table
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);


tableResult.print();

Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}




输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
,order_operation_time
,inventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders




另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢














在 2020-07-27 19:44:10,"Jark Wu"  写道:
>抱歉,还是没有看到附件。
>如果是文本的话,你可以直接贴到邮件里。
>
>On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>
>> 我再上传一次
>>
>> 在2020年07月27日 18:55,Jark Wu  写道:
>>
>> Hi,
>> 你的附件好像没有上传。
>>
>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>
>> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >
>> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> > " `data` VARCHAR , " +
>> > " `table` VARCHAR " +
>> > ") WITH (" +
>> > " 'connector' = 'kafka'," +
>> > " 'topic' = 'order_source'," +
>> > " 'properties.bootstrap.servers' = '***'," +
>> > " 'properties.group.id' = 'real1'," +
>> > " 'format' = 'json'," +
>> > " 'scan.startup.mode' = 'earliest-offset'" +
>> > ")";
>> >
>> >
>> > 具体见附件 有打印
>> >
>> >
>> >
>> >
>> >
>>
>>


Unable to submit high parallelism job in cluster

2020-07-27 Thread Annemarie Burger
Hi,

I am running Flink on a cluster with 24 workers, each with 16 cores.
Starting the cluster works fine and the Web interface confirms there are 384
slots working. Executing my code with parallelism 24 works fine, but when I
try a higher parallelism, eg. 384, the job never succeeds in submitting.
Also submitting from the web interface does not start the job, nor gives any
errors. I also tried starting 4 1-slot taskmanagers on each machine, and
executing with parallelism 96, but same problem. The code is not very
complicated, with the logical graph having only 3 steps. 
Attached is a file with the jstacks of the CliFrontend that is using CPU,
and the StandaloneSessionClusterEntrypoint, as well as the jstack of the
TaskManagerRunner on a remote machine(cloud-12). The jstacks are all from
this last scenario, when executing from command line.
 
My relevant conf is as follows: 

queryable-state.enable: true
jobmanager.rpc.address: cloud-11
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 28672
jobmanager.heap.mb: 14240
taskmanager.memory.fraction: 0.7
taskmanager.network.numberOfBuffers: 16384
taskmanager.network.bufferSizeInBytes: 16384
taskmanager.memory.task.off-heap.size: 4000m
taskmanager.memory.managed.size: 1m
#taskmanager.numberOfTaskSlots: 16 #for normal setup
taskmanager.numberOfTaskSlots: 1 #for when setting multiple taskmanagers per
machine. 

Am I doing something wrong?
Thanks in advance!

  jstack.jstack

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an
incomng attribute, event_name and aggregate it over 5 secs.
I looked at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a
duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task__numrecordsIn
or
flink_taskmanager_job_task_operator__numrecordsIn  ?? (so
at task or operator level

Or should I use User variables like below:

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance
  .counter("myCounter");


Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
wrote:

> Hi David,
> Thanks for your reply.
> I am already using the PrometheusReporter. I am trying to figure out how
> to dig into the application data and count grouped by an attribute called
> event_name in the incoming application data and report to Grafana via
> Prometheus.
>
> I see the following at a high level
> task_numRecordsIn
> task_numRecordsOut
> ..operator_numLateRecordsDropped
>
> Trying to dig in deeper than this numRecordsIn to get groped by event_name
> attribute coming in the Input record every 5 secs.
> TIA,
>
> On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
> wrote:
>
>> Setting up a Flink metrics dashboard in Grafana requires setting up and
>> configuring one of Flink's metrics reporters [1] that is supported by
>> Grafana as a data source. That means your options for a metrics reporter
>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>>
>> If you want reporting every 5 seconds, with the push based reporters
>> that's something you would configure in flink-conf.yaml, whereas with
>> Prometheus you'll need to configure the scrape interval in the prometheus
>> config file. For more on using Flink with Prometheus, see the blog post by
>> Maximilian Bode [2].
>>
>> Best,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>> [2]
>> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>>
>> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I am trying to figure out how many records came into the Flink App from
>>> KDS and how many records got moved to the next step or was dropped by the
>>> watermarks.
>>>
>>> I see on the Ui Table for *Source. Records Sent* with a total and the
>>> next step *Filter->FlatMap operator with a Records Received *total. How
>>> can I get these metric values for me to display In Grafana for eg. as I
>>> want to know a count for each 5 secs, how many records came in and how many
>>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>>
>>> I looked at the breakdown of the Source__Custom_Source in Metrics as
>>> show in the attached pic. It has values like 0.NumRecordsIn and
>>> 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified.
>>> It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn
>>> and 0.Timestamps/Watermarks.numRecordsOut
>>>
>>> Attached are some screenshots of the Flink DashBoard UI.
>>>
>>> TIA,
>>>
>>>


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to
dig into the application data and count grouped by an attribute called
event_name in the incoming application data and report to Grafana via
Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name
attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
wrote:

> Setting up a Flink metrics dashboard in Grafana requires setting up and
> configuring one of Flink's metrics reporters [1] that is supported by
> Grafana as a data source. That means your options for a metrics reporter
> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>
> If you want reporting every 5 seconds, with the push based reporters
> that's something you would configure in flink-conf.yaml, whereas with
> Prometheus you'll need to configure the scrape interval in the prometheus
> config file. For more on using Flink with Prometheus, see the blog post by
> Maximilian Bode [2].
>
> Best,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
> [2]
> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>
> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I am trying to figure out how many records came into the Flink App from
>> KDS and how many records got moved to the next step or was dropped by the
>> watermarks.
>>
>> I see on the Ui Table for *Source. Records Sent* with a total and the
>> next step *Filter->FlatMap operator with a Records Received *total. How
>> can I get these metric values for me to display In Grafana for eg. as I
>> want to know a count for each 5 secs, how many records came in and how many
>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>
>> I looked at the breakdown of the Source__Custom_Source in Metrics as show
>> in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut
>> and so on from 0 to 9 for the parallelism 10 I specified. It also has
>> various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
>> 0.Timestamps/Watermarks.numRecordsOut
>>
>> Attached are some screenshots of the Flink DashBoard UI.
>>
>> TIA,
>>
>>


[ANNOUNCE] Weekly Community Update 2020/29-30

2020-07-27 Thread Konstantin Knauf
Dear community,

happy to share an update for the last two weeks with the release of Apache
Flink 1.11.1, planning for Flink 1.12, a proposal for better
interoperability with Microsoft Azure services, a few blog posts and more.

Flink Development
==

* [releases] Flink 1.11.1 was released as a quick follow up to the Flink
1.11.0 release mostly fixing some critical issues in the Table API/SQL
ecosystem. [1]

* [releases] Robert started a thread to collect the different
topics/features that are planned for Flink 1.12. Robert & Dian will be our
release managers for this one. They propose a feature freeze around the end
of September. [2]

* [connectors] Israel Ekpo started a thread to discuss the contribution of
multiple connectors for Microsoft Azure services including Data Lake Store
Gen 2 (Filesystem), Azure Cosmos DB  (DataStream) and Azure Event Hub
(DataStream). [3]

* [sql] Seth has started a small discussion on how to handle timestamps if
a "datagen" table is created based on an existing table using the LIKE
clause. [4]

* [connectors] Benchao raised the point that the semantic of
InputFormat#nextRecord returning null is inconsistent throughout the code
case and would like to align these. No feedback so far. [5]

* [development process] Andrey reminds everyone to assign the "starter"
label to Jira issues, which are a good pick for new contributors to Apache
Flink. [6]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-1-released-tp43335.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Planning-Flink-1-12-tp43348.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-Azure-Platform-Support-in-DataStream-Table-and-SQL-Connectors-tp43342.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Handling-of-Timestamp-in-DataGen-table-created-via-LIKE-td43433.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Align-the-semantic-of-returning-null-from-InputFormat-nextRecord-tp43379.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/REMINDER-Use-starter-labels-for-Jira-issues-where-it-makes-sense-tp43284.html

Notable Bugs
==

* [FLINK-18705] [FLINK-18700] [1.11.1] For those of you, who are trying out
the new Debezium format, check out the limitations reported in [7,8].
* [FLINK-18656] [1.11.1] The checkpoint start delay metric is always zero
when unaligned checkpoints are used. [9]

[7] https://issues.apache.org/jira/browse/FLINK-18705
[8] https://issues.apache.org/jira/browse/FLINK-18700
[9] https://issues.apache.org/jira/browse/FLINK-18656

Events, Blog Posts, Misc
===

 * Two new posts on the Flink blog:
*  Dawid gives an overview over (external) catalogs (e.g.
HiveMetastore, PostgreSQL) in Flink. [10]
*  Kostas introduces the newly added "Application Mode" and contrasts
it to the two existing modes: "Session Mode" & "Per-Job Mode". [11]

* In this blog post Eric J. Bruno of Dell explains in detail how Apache
Flink can be used for complex event processing and streaming analytics. [12]

* On the 23rd, the Apache Flink meetup group in Seoul hosted a virtual
session with talks by SK Telecom (Korean), HyperConnect (Korean) and
Ververica (English). It is available on Youtube [13].

* We have published the training program for Flink Forward Global taking
place on the 20th & 21st of October. [14] There will be six different
courses offered over these two days:
* Flink Development (2 days)
* SQL Development (2 days)
* Runtime & Operations (1 day)
* Stateful Functions (1 day)
* Tuning & Troubleshooting (introduction and advanced, 1 day each).

[10] https://flink.apache.org/2020/07/23/catalogs.html
[11] https://flink.apache.org/news/2020/07/14/application-mode.html
[12]
https://blogs.oracle.com/javamagazine/streaming-analytics-with-java-and-apache-flink
[13] https://www.youtube.com/watch?v=HWTb5kn4LvE
[14] https://www.flink-forward.org/global-2020/training-program

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


JobManager refusing connections when running many jobs in parallel?

2020-07-27 Thread Hailu, Andreas
Hi team,

We've observed that when we submit a decent number of jobs in parallel from a 
single Job Master, we encounter job failures due with Connection Refused 
exceptions. We've seen this behavior start at 30 jobs running in parallel. It's 
seemingly transient, however, as upon several retries the job succeeds. The 
surface level error varies, but digging deeper in stack traces it looks to stem 
from the Job Manager no longer accepting connections.

I've included a couple of examples below from failed jobs' driver logs, with 
different errors stemming from a connection refused error:

First example: 15 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 30 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-563.dc.gs.com: Using job manager web tracking 
url http://d43723-563.dc.gs.com:41268;> Job Manager Web Interface  
(http://d43723-563.dc.gs.com:41268) 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 1dfef6303cf0e888231d4c57b4b4e0e6)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:273)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:341)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: d43723-563.dc.gs.com/10.47.126.221:41268
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 16 more
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: d43723-563.dc.gs.com/10.47.126.221:41268
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection refused

Second example: 30 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 60 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-484.dc.gs.com: Using job manager web tracking 
url 

Re:  problem with build from source flink 1.11

2020-07-27 Thread Timo Walther

Great to hear. Thanks for letting us know.

Regards,
Timo

On 27.07.20 17:58, Felipe Lolas wrote:

Seems fixed!

I was replacing only flink-dist.jar. When replaced all the compiled 
jar's from flink-1.1.0-bin fixed the issue.


Thanks!

El 27 de julio de 2020 4:28, Felipe Lolas  escribió:


Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 

Re: problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas

Seems fixed!

I was replacing only flink-dist.jar. When replaced all the compiled jar's from 
flink-1.1.0-bin fixed the issue.

Thanks!

El 27 de julio de 2020 4:28, Felipe Lolas  escribió:

Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 

Re: problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas

Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.(SpanningRecordSerializer.java:50)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.(RecordWriter.java:98)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
  

How to stream CSV from S3?

2020-07-27 Thread John Smith
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is
copied to S3?
3- Is that part of the table APIs?


Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-27 Thread Timo Walther

Hi Dmytro,

aggregate functions will support the new type system in Flink 1.12. 
Until then, they cannot be used with the new `call()` syntax as 
anonymous functions. In order to use the old type system, you need to 
register the function explicilty using SQL `CREATE FUNCTION a AS 
'myFunc'` and then use them in `call("myFunc", ...)`.


The mentioned "No match found for function signature fun()" was 
a bug that got fixed in 1.11.1:


https://issues.apache.org/jira/browse/FLINK-18520

This bug only exists for catalog functions, not temporary system functions.

Regards,
Timo


On 27.07.20 16:35, Dmytro Dragan wrote:

Hi All,

I see strange behavior of UDAF functions:

Let`s say we have a simple table:

EnvironmentSettings settings = 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();

TableEnvironment t = TableEnvironment./create/(settings);

Table table = t.fromValues(DataTypes./ROW/(
     DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()),
DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull())
     ),
/row/(1.0, "S"), /row/(2.0, "S"));
t.createTemporaryView("A", table);

As example we will use build-in function with a new name:

t.createTemporaryFunction("max_value", new 
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());


Using Table API we can write:

t.createTemporaryView("B", table
     .groupBy(/$/("symbol"))
     .select(/$/("symbol"),/call/("max_value", /$/("price")))
);

and get:

org.apache.flink.table.api.TableException: Aggregate functions are not 
updated to the new type system yet.


Using SQL API we can write:

t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A 
group by symbol"));


and get:

org.apache.flink.table.api.ValidationException: SQL validation failed. 
 From line 1, column 8 to line 1, column 23: No match found for function 
signature max_value()


Calling build-in max function instead of provided alias will produce 
correct results.


In addition,  non-retract implementation of max function 
(MaxAggFunction.DoubleMaxAggFunction) would produce:


org.apache.flink.table.api.ValidationException: Could not register 
temporary catalog function 'default_catalog.default_database.max_value' 
due to implementation errors.


Cause DoubleMaxAggFunction is not serializable.

Am I missing something?





Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-27 Thread Dmytro Dragan
Hi All,

I see strange behavior of UDAF functions:

Let`s say we have a simple table:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment t = TableEnvironment.create(settings);


Table table = t.fromValues(DataTypes.ROW(
DataTypes.FIELD("price", DataTypes.DOUBLE().notNull()),
DataTypes.FIELD("symbol", DataTypes.STRING().notNull())
),
row(1.0, "S"), row(2.0, "S"));
t.createTemporaryView("A", table);

As example we will use build-in function with a new name:

t.createTemporaryFunction("max_value", new 
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());

Using Table API we can write:

t.createTemporaryView("B", table
.groupBy($("symbol"))
.select($("symbol"),call("max_value", $("price")))
);
and get:
org.apache.flink.table.api.TableException: Aggregate functions are not updated 
to the new type system yet.

Using SQL API we can write:

t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A group by 
symbol"));
and get:
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 1, column 8 to line 1, column 23: No match found for function signature 
max_value()

Calling build-in max function instead of provided alias will produce correct 
results.

In addition,  non-retract implementation of max function 
(MaxAggFunction.DoubleMaxAggFunction) would produce:
org.apache.flink.table.api.ValidationException: Could not register temporary 
catalog function 'default_catalog.default_database.max_value' due to 
implementation errors.
Cause DoubleMaxAggFunction is not serializable.

Am I missing something?






回复:flink row 类型

2020-07-27 Thread kcz
哇 这个方式很取巧了 好机智 我之前就是一直索引取值 学习一下





-- 原始邮件 --
发件人: Jark Wu 

kafka avro格式sink报错,NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder

2020-07-27 Thread RS
Hi,
版本:Flink-1.11.1
任务启动模式:standalone
Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的

org.apache.flink
flink-avro
1.11.1

编译出来的jar也包含了这个class


我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的
1. 直接启动的话,会报错  Caused by: org.apache.flink.table.api.ValidationException: Could 
not find any factory for identifier 'avro' that implements 
'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.
Available factory identifiers are:
canal-json
csv
debezium-json
json
2. 下载了一个flink-avro-1.11.1.jar的jar包扔到了flink/lib下,报错 Caused by: 
java.lang.ClassNotFoundException: org.apache.avro.generic.GenericRecord
3. 下载了avro-1.10.0.jar放到flink/lib下,报错 Caused by: java.lang.NoClassDefFoundError: 
Could not initialize class org.apache.avro.SchemaBuilder
请教下,要支持avro的话,还需要怎么操作下?


Thanks



Re: problem with build from source flink 1.11

2020-07-27 Thread Chesnay Schepler

@Timo maven 3.2.5 is the recommended Maven version for building Flink.

@Felipe Can you provide us the full stacktrace? This could be a library 
issue in regards to JDK compatibility.


On 27/07/2020 15:23, Timo Walther wrote:

Hi Felipe,

are you sure that Maven and the TaskManagers are using the JDK version 
that you mentioned?


Usually, a `mvn clean install` in the `.../flink/` directory should 
succeed without any problems. Also your Maven version seems pretty 
old. I'm using Apache Maven 3.6.3 for example.


The NoSuchMethodError indicates that there is some version mismatch. 
It seems that this version mismatch is related to your JDK version. 
Maybe your task managers run a different version?


Let me know if this helped.

Regards,
Timo


On 27.07.20 12:09, Felipe Lolas wrote:

Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark 
streaming workloads).


Now, I currently testing some changes into flink-yarn, but when using 
my builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;


*env(build)*
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

*command(into flink-parent)*

mvn clean install -DskipTests -Dfast

*env*
yarn application mode
cdh 6.2.1
*
*
can anyone help me?

Thank you!
Cheers,
Felipe L







Re: problem with build from source flink 1.11

2020-07-27 Thread Timo Walther

Hi Felipe,

are you sure that Maven and the TaskManagers are using the JDK version 
that you mentioned?


Usually, a `mvn clean install` in the `.../flink/` directory should 
succeed without any problems. Also your Maven version seems pretty old. 
I'm using Apache Maven 3.6.3 for example.


The NoSuchMethodError indicates that there is some version mismatch. It 
seems that this version mismatch is related to your JDK version. Maybe 
your task managers run a different version?


Let me know if this helped.

Regards,
Timo


On 27.07.20 12:09, Felipe Lolas wrote:

Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark 
streaming workloads).


Now, I currently testing some changes into flink-yarn, but when using my 
builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;


*env(build)*
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

*command(into flink-parent)*

mvn clean install -DskipTests -Dfast

*env*
yarn application mode
cdh 6.2.1
*
*
can anyone help me?

Thank you!
Cheers,
Felipe L




Re:Re: kafka-connect json格式适配问题?

2020-07-27 Thread RS
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来?


Thanks
在 2020-07-27 17:49:18,"Jark Wu"  写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS  wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>>   {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>>   },
>>
>>   {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>>   }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>


来自kandy.wang的邮件

2020-07-27 Thread kandy.wang



Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Hi,
You can use the `set_python_requirements` method to specify your
requirement.txt which you can refer to the documentation[1] for details

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/dependency_management.html#python-dependency

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午8:29写道:

> Hi,
> And I've got another question.
> If I use user-defined function in pyflink, which only depends library A.
> And
> what the flink does is using the udf in tables.
> Does that mean I only need to install library A on the slaves?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
Hi,
And I've got another question.
If I use user-defined function in pyflink, which only depends library A. And
what the flink does is using the udf in tables.
Does that mean I only need to install library A on the slaves?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Parquet batch table sink in Flink 1.11

2020-07-27 Thread Flavio Pompermaier
I think that's not true when you need to integrate Flink into an existing
data-lake..I think it should be very straightforward (in my opinion) to
read/ write Parquet data with objects serialized with
avro/thrift/protobuf...or at least reuse hadoop input/output formats with
table API. At the moment I have to pass through a lot of custom code that
uses the Hadoop formats and is a lto of code just to read and write thrift
or avro serialized objects in parquet folders.

On Wed, Jul 22, 2020 at 3:35 AM Jingsong Li  wrote:

> In table/SQL,
>
> I think we don't need a source/sink for `AvroParquetOutputFormat`, because
> the data structure is always Row or RowData, should not be a avro object.
>
> Best,
> Jingsong
>
> On Tue, Jul 21, 2020 at 8:09 PM Flavio Pompermaier 
> wrote:
>
>> This is what I actually do but I was hoping to be able to get rid of the
>> HadoopOutputForma and be able to use a  more comfortable Source/Sink
>> implementation.
>>
>> On Tue, Jul 21, 2020 at 12:38 PM Jingsong Li 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> AvroOutputFormat only supports writing Avro files.
>>> I think you can use `AvroParquetOutputFormat` as a hadoop output format,
>>> and wrap it through Flink `HadoopOutputFormat`.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,
 is there a way to write out Parquet-Avro data using
 BatchTableEnvironment with Flink 1.11?
 At the moment I'm using the hadoop ParquetOutputFormat but I hope to be
 able to get rid of it sooner or later..I saw that there's the
 AvroOutputFormat but no support for it using Parquet.

 Best,
 Flavio

>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: kafka-connect json格式适配问题?

2020-07-27 Thread Leonard Xu
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?

窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 
其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的

祝好
Leonard 

> 在 2020年7月27日,20:05,RS  写道:
> 
> Hi,
> 改了下sql,遇到一个新的问题:
> Caused by: org.apache.flink.table.planner.codegen.CodeGenException: 
> Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, 
> `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, 
> `username` STRING, `update_time` TIMESTAMP(6)>'.
> 
> 
> SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as 
> update_time) as payload
> 
> 
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
> 
> 
> Thanks
> 在 2020-07-27 17:49:18,"Jark Wu"  写道:
>> Hi,
>> 
>> 你需要在 DDL 和 query 上都补上 schema 和 payload:
>> 
>> CREATE TABLE print_table \
>> (`schema` STRING, `payload` ROW> update_time TIMESTAMP(6)>) \
>> WITH (\
>> 'connector' = 'kafka', \
>> 'topic' = 'test_out', \
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>> 'sink.partitioner' = 'round-robin', \
>> 'format' = 'json')
>> 
>> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>> INSERT INTO output
>> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>> update_time) as payload
>> FROM ...
>> 
>> 
>> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>> mysql 不是很方便么?
>> 
>> Best,
>> Jark
>> 
>> 
>> On Mon, 27 Jul 2020 at 17:33, RS  wrote:
>> 
>>> hi,
>>> kafka->Flink->kafka->mysql
>>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>>> 使用kafka-connect是方便数据同时导出到其他存储
>>> 
>>> 
>>> 
>>> Flink定义输出表结构:
>>> 
>>> CREATE TABLE print_table \
>>> 
>>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>> 
>>> WITH (\
>>> 
>>> 'connector' = 'kafka', \
>>> 
>>> 'topic' = 'test_out', \
>>> 
>>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>> 
>>> 'sink.partitioner' = 'round-robin', \
>>> 
>>> 'format' = 'json')
>>> 
>>> 
>>> 
>>> 
>>> 输出的数据格式示例:
>>> 
>>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>> 
>>> 
>>> 
>>> 
>>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>> 
>>> {
>>> 
>>>  "schema": {
>>> 
>>>"type": "struct",
>>> 
>>>"fields": [
>>> 
>>>  {
>>> 
>>>"type": "int64",
>>> 
>>>"optional": false,
>>> 
>>>"field": "id"
>>> 
>>>  },
>>> 
>>>  {
>>> 
>>>"type": "string",
>>> 
>>>"optional": true,
>>> 
>>>"field": "name"
>>> 
>>>  }
>>> 
>>>],
>>> 
>>>"optional": true,
>>> 
>>>"name": "user"
>>> 
>>>  },
>>> 
>>>  "payload": {
>>> 
>>>"id": 1,
>>> 
>>>"name": "admin"
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>> 
>>> 当前Flink处理sql:
>>> 
>>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>>> MINUTE)
>>> 
>>> 
>>> 



Re:回复:解析kafka的mysql binlog问题

2020-07-27 Thread RS
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去





在 2020-07-27 19:21:51,"air23"  写道:

我再上传一次 


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


Re:Re: kafka-connect json格式适配问题?

2020-07-27 Thread RS
Hi,
改了下sql,遇到一个新的问题:
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported 
cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) 
*ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, 
`update_time` TIMESTAMP(6)>'.


SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) 
as payload


我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?


Thanks
在 2020-07-27 17:49:18,"Jark Wu"  写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS  wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>>   {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>>   },
>>
>>   {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>>   }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>


Re: 解析kafka的mysql binlog问题

2020-07-27 Thread Jark Wu
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。

On Mon, 27 Jul 2020 at 19:22, air23  wrote:

> 我再上传一次
>
> 在2020年07月27日 18:55,Jark Wu  写道:
>
> Hi,
> 你的附件好像没有上传。
>
> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>
> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >
> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> > " `data` VARCHAR , " +
> > " `table` VARCHAR " +
> > ") WITH (" +
> > " 'connector' = 'kafka'," +
> > " 'topic' = 'order_source'," +
> > " 'properties.bootstrap.servers' = '***'," +
> > " 'properties.group.id' = 'real1'," +
> > " 'format' = 'json'," +
> > " 'scan.startup.mode' = 'earliest-offset'" +
> > ")";
> >
> >
> > 具体见附件 有打印
> >
> >
> >
> >
> >
>
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Yes. You are right.

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午6:30写道:

> Hi, Xingbo
> Thanks for your reply.
> So the point is that simply link the source or the sink to the master's
> local file system will cause the error that the slaves cannot read the
> source/sink files? Thus the simplest solution is to make sure that slaves
> have access to the master's local filesystem (by nfs or hdfs)?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


回复:解析kafka的mysql binlog问题

2020-07-27 Thread air23
我再上传一次 


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


Re: 解析kafka的mysql binlog问题

2020-07-27 Thread Jark Wu
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread wind.fly....@outlook.com
补充一下,执行的sql如下:

select order_no, order_time from x.ods.ods_binlog_test_trip_create_t_order_1


发件人: wind.fly@outlook.com 
发送时间: 2020年7月27日 18:49
收件人: user-zh@flink.apache.org 
主题: flink1.11.0 执行sqlQuery时报NullPointException

Hi,all:
 本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive 
catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
Caused by: java.lang.NullPointerException
  at java.util.Objects.requireNonNull(Objects.java:203)
  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
  at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
  at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
  at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
  at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
  at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
  at 
org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
  at 
org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
  at 
org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
  at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
  at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
  at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
  at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
  at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
  at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
  at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
  at 
org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
  at 
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
  at 
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
  at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
  at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
  at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
  at 
org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
  at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
  at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
  at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
  at 

答复: How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread 范超
Thanks Jake , I’ll try it out. It worked!

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年7月27日 星期一 18:33
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to get CLI parameters when deploy on yarn cluster

Hi fanchao

You can use params after jar file.

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar param1 param2 param3

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html

Jake


On Jul 27, 2020, at 6:19 PM, 范超 mailto:fanc...@mgtv.com>> 
wrote:

Hi, Flink community

I’m starter at Flink ,and don’t know how to passing parameters to my jar file, 
where I want to start the job in detached mode on the yarn cluster.
Here is my shell code:

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar -runat=test 2>&1

In my jar file, the code will use different config.properties file by reading 
the “runat” CLI parameter, but I don’t know how to get this CLI parameter
or could you please tell me if I’ve two environment one for testing and the 
other for production environment property files, how can I start it using cli 
option?

Thanks a lot , Any help is appreciated.

Chao fan




flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 Thread wind.fly....@outlook.com
Hi,all:
 本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive 
catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
Caused by: java.lang.NullPointerException
  at java.util.Objects.requireNonNull(Objects.java:203)
  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
  at 
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
  at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
  at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
  at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
  at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
  at 
org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
  at 
org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
  at 
org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
  at 
org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
  at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
  at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
  at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
  at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
  at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
  at 
org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
  at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
  at 
org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
  at 
org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
  at 
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
  at 
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
  at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
  at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
  at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
  at 
org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
  at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
  at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
  at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
  at 

Re: How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread Jake
Hi fanchao

You can use params after jar file.

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar param1 param2 param3

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
 


Jake

> On Jul 27, 2020, at 6:19 PM, 范超  wrote:
> 
> Hi, Flink community
> 
> I’m starter at Flink ,and don’t know how to passing parameters to my jar 
> file, where I want to start the job in detached mode on the yarn cluster.
> Here is my shell code:
> 
> /usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
> ~/project/test/app/test.jar -runat=test 2>&1
> 
> In my jar file, the code will use different config.properties file by reading 
> the “runat” CLI parameter, but I don’t know how to get this CLI parameter
> or could you please tell me if I’ve two environment one for testing and the 
> other for production environment property files, how can I start it using cli 
> option?
> 
> Thanks a lot , Any help is appreciated.
> 
> Chao fan
> 



?????? sql-client ??jdbc??????

2020-07-27 Thread op
 




----
??: 
   "user-zh"



Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Tom Fennelly
No, I think David answered the specific question that I asked i.e. is it
okay (or not) for operators other than sinks and side outputs to do I/O.
Purging DLQ entries is something we'll need to be able to do anyway (for
some scenarios - aside from successful checkpoint retries) and I
specifically wasn't asking about sink functions.

On Mon, Jul 27, 2020 at 10:08 AM Stephen Connolly 
wrote:

> I am not 100% certain that David is talking about the same pattern of
> usage that you are Tom.
>
> David, the pattern Tom is talking about is something like this...
>
> try {
>   do something with record
> } catch (SomeException e) {
>   push record to DLQ
> }
>
> My concern is that if we have a different failure, or even a restart from
> checkpoint because say the task manager OOM'd or was killed... now the
> record is replayed... and this time the "do something with record"
> succeeded... but it's still on the DLQ from last time
>
> If the DLQ is a flink native output that pushes to an exactly-once sink
> then you do not have that issue. When you roll the side-output behind
> flinks back... then you have to take all those potentials into account
> which significantly complicates the code
>
> On 2020/07/27 07:45:27, Tom Fennelly  wrote:
> > Thank you David.
> >
> > In the case we have in mind it should only happen literally on the very
> > rare Exception i.e. in some cases if somehow an uncaught exception
> occurs,
> > we want to send the record to a DLQ and handle the retry manually Vs
> > checkpointing and restarting.
> >
> > Regards,
> >
> > Tom.
> >
> >
> > On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
> > wrote:
> >
> > > Every job is required to have a sink, but there's no requirement that
> all
> > > output be done via sinks. It's not uncommon, and doesn't have to cause
> > > problems, to have other operators that do I/O.
> > >
> > > What can be problematic, however, is doing blocking I/O. While your
> user
> > > function is blocked, the function will exert back pressure, and
> checkpoint
> > > barriers will be unable to make any progress. This sometimes leads to
> > > checkpoint timeouts and job failures. So it's recommended to make any
> I/O
> > > you do asynchronous, using an AsyncFunction [1] or something similar.
> > >
> > > Note that the asynchronous i/o function stores the records for
> in-flight
> > > asynchronous requests in checkpoints, and restores/re-triggers the
> requests
> > > when recovering from a failure. This might lead to duplicate results
> if you
> > > are using it to do non-idempotent database writes. If you need
> > > transactions, use a sink that offers them.
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
> >
> > >
> > > Best,
> > > David
> > >
> > > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly  >
> > > wrote:
> > >
> > >> Hi.
> > >>
> > >> What are the negative side effects of (for example) a filter function
> > >> occasionally making a call out to a DB ? Is this a big no-no and
> should all
> > >> outputs be done through sinks and side outputs, no exceptions ?
> > >>
> > >> Regards,
> > >>
> > >> Tom.
> > >>
> > >
> >
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
Hi, Xingbo
Thanks for your reply.
So the point is that simply link the source or the sink to the master's
local file system will cause the error that the slaves cannot read the
source/sink files? Thus the simplest solution is to make sure that slaves
have access to the master's local filesystem (by nfs or hdfs)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread 范超
Hi, Flink community

I’m starter at Flink ,and don’t know how to passing parameters to my jar file, 
where I want to start the job in detached mode on the yarn cluster.
Here is my shell code:

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar -runat=test 2>&1

In my jar file, the code will use different config.properties file by reading 
the “runat” CLI parameter, but I don’t know how to get this CLI parameter
or could you please tell me if I’ve two environment one for testing and the 
other for production environment property files, how can I start it using cli 
option?

Thanks a lot , Any help is appreciated.

Chao fan
<>

解析kafka的mysql binlog问题

2020-07-27 Thread air23
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?

private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +

")";




具体见附件 有打印

Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Hi rookieCOder,
You need to make sure that your files can be read by each slaves, so an
alternative solution is to put your files on hdfs

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午5:49写道:

> 'm coding with pyflink 1.10.0 and building cluster with flink 1.10.0
> I define the source and the sink as following:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2674/%E6%97%A0%E6%A0%87%E9%A2%98.png>
>
> When I run this code only on master, it's OK. When I run this code on
> cluster, with 1 master and 1 salve, and I submit the task on master like
> this:
> sudo flink-1.10.0/bin/flink run -py main.py
> And error occurs like:
> Caused by: java.io.FileNotFoundException: The provided file path
> /opt/raw_data/input_json_65adbe54-cfdc-11ea-9d47-020012970011 does not
> exist.
> This file is stored on master's local file system. It seems that the slaves
> read
> their own file system instead of the master's. Or maybe there are other
> reasons.
> The question is how can I avoid the error?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink Sql 问题

2020-07-27 Thread air23
你好 



problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas
Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark streaming 
workloads). 

Now, I currently testing some changes into flink-yarn, but when using my 
builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

env(build)
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

command(into flink-parent)

mvn clean install -DskipTests -Dfast

env
yarn application mode
cdh 6.2.1

can anyone help me?

Thank you!
Cheers,
Felipe L

Re: kafka-connect json格式适配问题?

2020-07-27 Thread Jark Wu
Hi,

你需要在 DDL 和 query 上都补上 schema 和 payload:

CREATE TABLE print_table \
(`schema` STRING, `payload` ROW) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')

-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
INSERT INTO output
SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
update_time) as payload
FROM ...


Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
mysql 不是很方便么?

Best,
Jark


On Mon, 27 Jul 2020 at 17:33, RS  wrote:

> hi,
> kafka->Flink->kafka->mysql
> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
> 使用kafka-connect是方便数据同时导出到其他存储
>
>
>
> Flink定义输出表结构:
>
> CREATE TABLE print_table \
>
> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>
> WITH (\
>
> 'connector' = 'kafka', \
>
> 'topic' = 'test_out', \
>
> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>
> 'sink.partitioner' = 'round-robin', \
>
> 'format' = 'json')
>
>
>
>
> 输出的数据格式示例:
>
> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>
>
>
>
> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>
> {
>
>   "schema": {
>
> "type": "struct",
>
> "fields": [
>
>   {
>
> "type": "int64",
>
> "optional": false,
>
> "field": "id"
>
>   },
>
>   {
>
> "type": "string",
>
> "optional": true,
>
> "field": "name"
>
>   }
>
> ],
>
> "optional": true,
>
> "name": "user"
>
>   },
>
>   "payload": {
>
> "id": 1,
>
> "name": "admin"
>
>   }
>
> }
>
>
>
>
> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>
> 当前Flink处理sql:
>
> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
> MINUTE)
>
>
>


Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
'm coding with pyflink 1.10.0 and building cluster with flink 1.10.0
I define the source and the sink as following:

 
When I run this code only on master, it's OK. When I run this code on
cluster, with 1 master and 1 salve, and I submit the task on master like
this:
sudo flink-1.10.0/bin/flink run -py main.py 
And error occurs like:
Caused by: java.io.FileNotFoundException: The provided file path
/opt/raw_data/input_json_65adbe54-cfdc-11ea-9d47-020012970011 does not
exist.
This file is stored on master's local file system. It seems that the slaves
read
their own file system instead of the master's. Or maybe there are other
reasons.
The question is how can I avoid the error?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: sql-client 的jdbc表出错

2020-07-27 Thread 杨荣
在 yarn 上提交 job 可以,不代表通过 sql-client 可以,他们使用的是不同的脚本和配置。前者跟 bin/flink,
bin/yarn-session.sh, conf/flink-conf.yaml 有关,后跟 bin/sql-client.sh,
conf/sql-client-defaults.yaml 有关。

你可以理一下这个逻辑,或者给出你的相关配置文件,以及 sql-client.sh 启动完整命令。

op <520075...@qq.com> 于2020年7月27日周一 下午5:29写道:

> 你好,
>  很确定,检查了好几遍,提交到yarn上执行都没问题,sql-client里面报这个错,1.11.0版本
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> samyang31...@gmail.com;
> 发送时间:2020年7月27日(星期一) 下午5:25
> 收件人:"user-zh"
> 主题:Re: sql-client 的jdbc表出错
>
>
>
> hi,
>
> 你能确定你的 class path 下有 mysql-connector-java-5.1.38.jar 依赖吗?请在运行时确认下这一点。
>
> op <520075...@qq.com 于2020年7月27日周一 下午2:45写道:
>
>  您好,我创建了一个jdbc的表
> 
> 
>  CREATE TABLE mvp_dim_anticheat_args_all (
>  nbsp; nbsp; id BIGINT,
>  nbsp; nbsp; dt STRING,
>  nbsp; nbsp; cnt_7d INT,
>  nbsp; nbsp;cnt_30d INT,
>  nbsp; PRIMARY KEY (id) NOT ENFORCED
>  ) WITH (
>  nbsp; nbsp;'connector' = 'jdbc',
>  nbsp; nbsp;'driver'='com.mysql.jdbc.Driver',
>  nbsp; nbsp;'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
>  nbsp; nbsp;'table-name' = 'mvp_dim_ll',
>  nbsp; nbsp;'username' = 'huy_oi',
>  nbsp; nbsp;'password' = '420123'
>  );
> 
> 
> 
>  查询的时候报nbsp;
>  [ERROR] Could not execute SQL statement. Reason:
>  java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
> 
> 
> 
> 
> 
>  我的安装目录lib下面有nbsp;flink-connector-jdbc_2.11-1.11.0.jar
>  和nbsp;mysql-connector-java-5.1.38.jar 这俩,请问是什么原因??
>  谢谢


Re: Blink的Batch模式的并行度问题

2020-07-27 Thread jun su
hi,

如果底层是FileInputFormat ,默认就是1个并行度, 这个参数我尝试了并不起作用,
看代码是创建了一个SingleOutputStreamOperator , 感觉得重写下我使用的OrcInputFormat ,
让他不继承FileInputFormat , 像源码里的HiveInputFormat一样

Caizhi Weng  于2020年7月27日周一 下午5:31写道:

> Hi,
>
> 可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>
> jun su  于2020年7月27日周一 下午3:50写道:
>
> > hi all,
> >
> > Flink 目前的blink table planner batch mode
> > (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
> > 但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
> > 那么如何能扩大并行度来优化性能呢?
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


kafka-connect json格式适配问题?

2020-07-27 Thread RS
hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储



Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')




输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}




但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

  "schema": {

"type": "struct",

"fields": [

  {

"type": "int64",

"optional": false,

"field": "id"

  },

  {

"type": "string",

"optional": true,

"field": "name"

  }

],

"optional": true,

"name": "user"

  },

  "payload": {

"id": 1,

"name": "admin"

  }

}




请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS 
total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as 
update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' 
MINUTE)




Re: Blink的Batch模式的并行度问题

2020-07-27 Thread Caizhi Weng
Hi,

可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism

jun su  于2020年7月27日周一 下午3:50写道:

> hi all,
>
> Flink 目前的blink table planner batch mode
> (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
> 但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
> 那么如何能扩大并行度来优化性能呢?
>
> --
> Best,
> Jun Su
>


?????? sql-client ??jdbc??????

2020-07-27 Thread op
??
 
yarn??,sql-client??1.11.0
----
??: 
   "user-zh"



Re: sql-client 的jdbc表出错

2020-07-27 Thread 杨荣
hi,

你能确定你的 class path 下有 mysql-connector-java-5.1.38.jar 依赖吗?请在运行时确认下这一点。

op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道:

> 您好,我创建了一个jdbc的表
>
>
> CREATE TABLE mvp_dim_anticheat_args_all (
>   id BIGINT,
>   dt STRING,
>   cnt_7d INT,
>  cnt_30d INT,
>  PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>  'driver'='com.mysql.jdbc.Driver',
>  'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
>  'table-name' = 'mvp_dim_ll',
>  'username' = 'huy_oi',
>  'password' = '420123'
> );
>
>
>
> 查询的时候报
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
>
>
>
>
>
> 我的安装目录lib下面有flink-connector-jdbc_2.11-1.11.0.jar
> 和mysql-connector-java-5.1.38.jar 这俩,请问是什么原因??
> 谢谢


Re: sql-client 的jdbc表出错

2020-07-27 Thread Caizhi Weng
Hi,

mysql-connector-java-5.1.38.jar 应该已经包含了 com.mysql.jdbc.Driver 才对;Flink
是以什么模式运行的呢?如果是 standalone session,在 Flink 的 lib 下添加 jar 包之后是否重启过 session
集群呢?另外是否所有的 worker 都添加了 jar 包呢?如果能打出完整的错误栈会更好。

op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道:

> 您好,我创建了一个jdbc的表
>
>
> CREATE TABLE mvp_dim_anticheat_args_all (
>   id BIGINT,
>   dt STRING,
>   cnt_7d INT,
>  cnt_30d INT,
>  PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>  'driver'='com.mysql.jdbc.Driver',
>  'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
>  'table-name' = 'mvp_dim_ll',
>  'username' = 'huy_oi',
>  'password' = '420123'
> );
>
>
>
> 查询的时候报
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
>
>
>
>
>
> 我的安装目录lib下面有flink-connector-jdbc_2.11-1.11.0.jar
> 和mysql-connector-java-5.1.38.jar 这俩,请问是什么原因??
> 谢谢


Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther

Hi Dian,

we had this discussion in the past. Yes, it might help in certain cases. 
But on the other hand also helps in finding version mismatches when 
people misconfigured there dependencies. Different JVM versions should 
not result incompatible classes as the default serialVersionUID is 
standadized, no?


Regards,
Timo

On 27.07.20 10:53, Dian Fu wrote:
@Wojtek Just find that it has not defined the serialVersionUID in 
org.apache.flink.table.types.logical.RowType$RowField and so you have to 
make sure that the JDK version is the same between the client side and 
the server side. Could you check that?


@Timo I think we should define the serialVersionUID for all the classes 
which implements Serializable. What do you think?


Regards,
Dian

在 2020年7月27日,下午4:38,Timo Walther > 写道:


Hi,

the InvalidClassException indicates that you are using different 
versions of the same class. Are you sure you are using the same Flink 
minor version (including the Scala suffix) for all dependencies and 
Kubernetes?


Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:

Hi,
when I try it locally it runs well. The problem is when I run it 
using Kubernetes. I don't know how to make Flink and Kubernetes go 
well together in that case.

Best, Wojtek
pt., 24 lip 2020 o 17:51 Xingbo Huang > napisał(a):

   Hi Wojciech,
   In many cases, you can make sure that your code can run correctly in
   local mode, and then submit the job to the cluster for testing. For
   how to add jar packages in local mode, you can refer to the doc[1].
   Besides, you'd better use blink planner which is the default
   planner. For how to use blink planner, you can refer to the doc[2]
   [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
   [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
   Best,
   Xingbo
   Wojciech Korczyński 

   > 于2020年7月24日周五 下午
   9:40写道:
   Hi,
   I've done like you recommended:
   from pyflink.datastreamimport StreamExecutionEnvironment
   from pyflink.datasetimport ExecutionEnvironment
   from pyflink.tableimport TableConfig, DataTypes, 
BatchTableEnvironment, StreamTableEnvironment, ScalarFunction
   from pyflink.table.descriptorsimport Schema, OldCsv, 
FileSystem, Kafka, Json, Csv

   from pyflink.table.udfimport udf
   exec_env = StreamExecutionEnvironment.get_execution_environment()
   t_config = TableConfig()
   t_env = StreamTableEnvironment.create(exec_env, t_config)
   INPUT_TABLE ="my_topic"
   INPUT_TOPIC ="my-topic"
   LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
   OUTPUT_TABLE ="my_topic_output"
   OUTPUT_TOPIC ="my-topic-output"
   ddl_source =f"""
   CREATE TABLE {INPUT_TABLE}(
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{INPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """
   ddl_sink =f"""
   CREATE TABLE {OUTPUT_TABLE}(
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{OUTPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """
   t_env.execute_sql(ddl_source)
   t_env.execute_sql(ddl_sink)
   result = t_env.execute_sql(f"""
   INSERT INTO {OUTPUT_TABLE}
   SELECT message
   FROM {INPUT_TABLE}
   """)
   result.get_job_client().get_job_execution_result().result()
   I think it is correctly written.
   However after deploying that job I'm getting an error:
   wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 
-py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar

   WARNING: An illegal reflective access operation has occurred
   WARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar) 
to field java.util.Properties.serialVersionUID
   WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
   WARNING: Use --illegal-access=warn to enable warnings of 
further illegal reflective access operations
   WARNING: All illegal access operations will be denied in a 
future release

   Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
   Traceback (most recent call last):
  File "kafka2flink.py", line 62, in 
    result.get_job_client().get_job_execution_result().result()
  File 

Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-27 Thread Timo Walther

Hi Dmytro,

one major difference between legacy and Blink planner is that the Blink 
planner is not build on top of DataStream API. It uses features of lower 
levels (StreamOperator, Transformation). In the mid-term we want to 
remove the check and make Table API and DataStream API 100% back and 
forth compatible for batch and streaming.


"there is no way to create/retract stream": What are you planning to do 
with the created stream? If you want to sink it into an external system, 
the new FLIP-95 sinks support all changelog semantics now.


Regards,
Timo


On 24.07.20 17:49, Dmytro Dragan wrote:

Hi Timo,
Thank you for response.

Well, it was working.
We have a number of pipelines in production which reuse DataStream and Table 
API parts on Flink 1.10, both for stream and batch.
The same that simple case without aggregation would work in Flink 1.11

But let`s assume there are some incompatible changes and such approach would 
not work anymore.

In case of TableEnvironment there is no way to create/retract stream.
I would assume that it is possible to wrapped stream in bounded 
StreamTableSource/ StreamTableSink
and use deprecated TableEnvironment methods to register them, but I`m wonder if 
there is a better way to do it.

It sounds a quite strange that with having Blink planner which optimise 
DataStream pipelines for stream and batch jobs,
there is necessity to write the same things on DataStream and DataSet API.


On 24/07/2020, 15:36, "Timo Walther"  wrote:

 Hi Dmytro,
 
 `StreamTableEnvironment` does not support batch mode currently. Only

 `TableEnvironment` supports the unified story. I saw that you disabled
 the check in the `create()` method. This check exists for a reason.
 
 For batch execution, the planner sets specific properties on the stream

 graph that the StreamExecutionEnvironment cannot handle (e.g. blocking
 inputs). My guess would be that this is the reason for your exception.
 
 Have you tried to use the unified `TableEnvironment`?
 
 Regards,

 Timo
 
 
 
 
 On 23.07.20 15:14, Dmytro Dragan wrote:

 > Hi All,
 >
 > We are working on migration existing pipelines from Flink 1.10 to Flink
 > 1.11.
 >
 > We are using Blink planner and have unified pipelines which can be used
 > in stream and batch mode.
 >
 > Stream pipelines works as expected, but batch once fail on Flink 1.11 if
 > they have any table aggregation transformation.
 >
 > Simple example of failed pipeline:
 >
 > StreamExecutionEnvironment env =
 > StreamExecutionEnvironment./getExecutionEnvironment/();
 > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
 >
 > TableConfig tableConfig = new TableConfig();
 > tableConfig.setIdleStateRetentionTime(
 >  org.apache.flink.api.common.time.Time./minutes/(10),
 > org.apache.flink.api.common.time.Time./minutes/(30)
 > );
 > EnvironmentSettings settings =
 > 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
 >
 > // is created using work around with ignoring settings.isStreamingMode()
 > check
 > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
 >
 > DataStreamSource streamSource = env.fromCollection(/asList/(new
 > A("1"), new A("2")));
 >
 > Table table = tEnv.fromDataStream(streamSource);
 > tEnv.createTemporaryView("A", table);
 >
 > String sql = "select s from A group by s";
 >
 > tEnv
 > .toRetractStream(tEnv.sqlQuery(sql), Row.class)
 >   .flatMap(new RetractFlatMap())
 >   .map(Row::toString)
 >   .addSink(new TestSinkFunction<>());
 >
 > env.execute("");
 >
 > /values/.forEach(System./out/::println);
 >
 > Exception:
 >
 > Caused by: java.lang.IllegalStateException: Trying to consume an input
 > partition whose producer is not ready (result type: BLOCKING, partition
 > consumable: false, producer state: DEPLOYING, partition id:
 > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
 >
 >  at
 > 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
 >
 >  …
 >
 > Adding StreamTableEnvironment execute does not help.
 >
 > Could you please advise what I`m missing?
 >
 
 
 





Hbase connector????????

2020-07-27 Thread op
 habse??family1 

INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;


Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Stephen Connolly
I am not 100% certain that David is talking about the same pattern of usage 
that you are Tom.

David, the pattern Tom is talking about is something like this...

try {
  do something with record
} catch (SomeException e) {
  push record to DLQ
}

My concern is that if we have a different failure, or even a restart from 
checkpoint because say the task manager OOM'd or was killed... now the record 
is replayed... and this time the "do something with record" succeeded... but 
it's still on the DLQ from last time

If the DLQ is a flink native output that pushes to an exactly-once sink then 
you do not have that issue. When you roll the side-output behind flinks back... 
then you have to take all those potentials into account which significantly 
complicates the code

On 2020/07/27 07:45:27, Tom Fennelly  wrote: 
> Thank you David.
> 
> In the case we have in mind it should only happen literally on the very
> rare Exception i.e. in some cases if somehow an uncaught exception occurs,
> we want to send the record to a DLQ and handle the retry manually Vs
> checkpointing and restarting.
> 
> Regards,
> 
> Tom.
> 
> 
> On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
> wrote:
> 
> > Every job is required to have a sink, but there's no requirement that all
> > output be done via sinks. It's not uncommon, and doesn't have to cause
> > problems, to have other operators that do I/O.
> >
> > What can be problematic, however, is doing blocking I/O. While your user
> > function is blocked, the function will exert back pressure, and checkpoint
> > barriers will be unable to make any progress. This sometimes leads to
> > checkpoint timeouts and job failures. So it's recommended to make any I/O
> > you do asynchronous, using an AsyncFunction [1] or something similar.
> >
> > Note that the asynchronous i/o function stores the records for in-flight
> > asynchronous requests in checkpoints, and restores/re-triggers the requests
> > when recovering from a failure. This might lead to duplicate results if you
> > are using it to do non-idempotent database writes. If you need
> > transactions, use a sink that offers them.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > 
> >
> > Best,
> > David
> >
> > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly 
> > wrote:
> >
> >> Hi.
> >>
> >> What are the negative side effects of (for example) a filter function
> >> occasionally making a call out to a DB ? Is this a big no-no and should all
> >> outputs be done through sinks and side outputs, no exceptions ?
> >>
> >> Regards,
> >>
> >> Tom.
> >>
> >
> 


Re: Unable to deduce RocksDB api calls in streaming.

2020-07-27 Thread Timo Walther

Hi Aviral,

as far as I know we are not calling RocksDB API to perform snapshots. As 
the Stackoverflow answer also indicates most of the snapshotting is done 
outside of RocksDB by just dealing with the SST files. Have you checked 
the available metrics in the web UI?


https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html

But maybe a RocksDB expert (in CC) knows more about this topic.

Regards,
Timo

On 27.07.20 00:02, Aviral Srivastava wrote:

Hi all!

I want to profile the time taken to make snapshot calls to RocksDB when 
using Flink in streaming mode.


I have forked the flink core repo, added the example of fraud detection, 
configured the state backend and checkpointing. The program is running 
successfully.


I have also been able to import the code base into IntelliJ IDE and 
debug it. However, I could not trace the function calls to the RocksDB. 
Is there any documentation that would help me out in this regard? Or can 
someone help me here?


Corresponding question on SO: 
https://stackoverflow.com/questions/63103295/what-are-the-api-calls-made-to-create-snapshot-while-using-rocksdb-as-the-state


Best,
Aviral Srivastava
LinkedIn  | Website 





Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther

Hi,

the InvalidClassException indicates that you are using different 
versions of the same class. Are you sure you are using the same Flink 
minor version (including the Scala suffix) for all dependencies and 
Kubernetes?


Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:

Hi,

when I try it locally it runs well. The problem is when I run it 
using Kubernetes. I don't know how to make Flink and Kubernetes go well 
together in that case.


Best, Wojtek

pt., 24 lip 2020 o 17:51 Xingbo Huang > napisał(a):


Hi Wojciech,
In many cases, you can make sure that your code can run correctly in
local mode, and then submit the job to the cluster for testing. For
how to add jar packages in local mode, you can refer to the doc[1].
Besides, you'd better use blink planner which is the default
planner. For how to use blink planner, you can refer to the doc[2]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment

Best,
Xingbo

Wojciech Korczyński mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周五 下午
9:40写道:

Hi,

I've done like you recommended:

from pyflink.datastreamimport StreamExecutionEnvironment
from pyflink.datasetimport ExecutionEnvironment
from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, 
StreamTableEnvironment, ScalarFunction
from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, 
Json, Csv
from pyflink.table.udfimport udf

exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE ="my_topic"
INPUT_TOPIC ="my-topic"
LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
OUTPUT_TABLE ="my_topic_output"
OUTPUT_TOPIC ="my-topic-output"

ddl_source =f"""
CREATE TABLE {INPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_sink =f"""
CREATE TABLE {OUTPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

result = t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT message
FROM {INPUT_TABLE}
""")

result.get_job_client().get_job_execution_result().result()

I think it is correctly written.

However after deploying that job I'm getting an error:

wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
 to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further 
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future 
release
Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
Traceback (most recent call last):
   File "kafka2flink.py", line 62, in 
     result.get_job_client().get_job_execution_result().result()
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
 line 78, in result
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
: java.util.concurrent.ExecutionException: 

Re:Re: Re: Could not find any factory for identifier 'kafka'

2020-07-27 Thread RS
Hi,
1. 好的,学习了
2. 
确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了




在 2020-07-27 11:42:50,"Caizhi Weng"  写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS  于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
>> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>>
>>
>> 在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>> >hi
>> >只需要-sql和-json两个包就可以了
>> >
>> >
>> >| |
>> >JasonLee
>> >|
>> >|
>> >邮箱:17610775...@163.com
>> >|
>> >
>> >Signature is customized by Netease Mail Master
>> >
>> >On 07/24/2020 17:02, RS wrote:
>> >hi,
>> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> >编译的jar包是jar-with-dependencies的
>> >
>> >
>> >代码片段:
>> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> >   "  number BIGINT,\n" +
>> >   "  msg STRING,\n" +
>> >   "  username STRING,\n" +
>> >   "  update_time TIMESTAMP(3)\n" +
>> >   ") WITH (\n" +
>> >   " 'connector' = 'kafka',\n" +
>> >   " 'topic' = '%s',\n" +
>> >   " 'properties.bootstrap.servers' = '%s',\n" +
>> >   " 'properties.group.id' = '%s',\n" +
>> >   " 'format' = 'json',\n" +
>> >   " 'json.fail-on-missing-field' = 'false',\n" +
>> >   " 'json.ignore-parse-errors' = 'true'\n" +
>> >   ")\n", tableName, topic, servers, group);
>> >
>> >
>> >   StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >   StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> >   tableEnv.executeSql(ddlSql);
>> >
>> >
>> >报错信息:
>> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> >Available factory identifiers are:
>> >datagen
>> >at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >... 33 more
>> >
>> >
>> >参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>> >
>> >
>> >附上pom依赖:
>> >
>> >   
>> >   org.apache.flink
>> >   flink-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java-bridge_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-table-api-java
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-sql-connector-kafka_2.12
>> >   ${flink.version}
>> >   
>> >   
>> >   org.apache.flink
>> >   flink-json
>> >   ${flink.version}
>> >   
>> >   
>> >
>> >
>> >感谢各位~
>>


Re: Flink Session TM Logs

2020-07-27 Thread Yang Wang
Just share another method about how to access the finished TaskManager logs
on Yarn.

Currently, only when a Yarn application finished/failed/killed, the logs
will be aggregated to HDFS. That means
if the Flink application is still running, you could still use the Yarn
NodeManager webUI to access the TM logs. Please
build the url in the following schema.

http:///node/containerlogs/container_xx//taskmanager.log



Best,
Yang

Robert Metzger  于2020年7月27日周一 下午2:42写道:

> Hi Richard,
> thanks for forwarding my answer to the list!
>
> I fear that Flink does not have a built-in solution for serving the logs
> of a finished TaskManager while a YARN session is still running.
>
> I agree with Yangze that you probably have to rely on an external logging
> service, such as ElasticSearch or Splunk to index your log events.
> Maybe there's also some tooling from Cloudera specifically made for YARN?
>
>
>
> On Mon, Jul 27, 2020 at 3:46 AM Yangze Guo  wrote:
>
>> Hi, Richard
>>
>> Before the session has been terminated, you could not fetch the
>> terminated TM logs. One possible solution could be leveraging the
>> log4j2 appenders[1]. Flink uses log4j2 as default in the latest
>> release 1.11.
>>
>> [1] https://logging.apache.org/log4j/2.x/manual/appenders.html
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Jul 25, 2020 at 2:37 AM Richard Moorhead
>>  wrote:
>> >
>> >
>> >
>> > -- Forwarded message -
>> > From: Robert Metzger 
>> > Date: Fri, Jul 24, 2020 at 1:09 PM
>> > Subject: Re: Flink Session TM Logs
>> > To: Richard Moorhead 
>> >
>> >
>> > I accidentally replied to you directly, not to the user@ mailing list.
>> Is it okay for you to publish the thread on the list again?
>> >
>> >
>> >
>> > On Fri, Jul 24, 2020 at 8:01 PM Richard Moorhead <
>> richard.moorh...@gmail.com> wrote:
>> >>
>> >> It is enabled. The issue is that for a long running flink session
>> -which may execute many jobs- the task managers, after a job is completed,
>> are gone, and their logs arent available.
>> >>
>> >> What I have noticed is that when the session is terminated I am able
>> to find the logs in the job history server under the associated yarn
>> application id.
>> >>
>> >> On Fri, Jul 24, 2020 at 12:51 PM Robert Metzger 
>> wrote:
>> >>>
>> >>> Hi Richard,
>> >>>
>> >>> you need to enable YARN log aggregation to access logs of finished
>> YARN applications.
>> >>>
>> >>> On Fri, Jul 24, 2020 at 5:58 PM Richard Moorhead <
>> richard.moorh...@gmail.com> wrote:
>> 
>>  When running a flink session on YARN, task manager logs for a job
>> are not available after completion. How do we locate these logs?
>> 
>>
>


Re: Kafka connector with PyFlink

2020-07-27 Thread Wojciech Korczyński
Hi,

when I try it locally it runs well. The problem is when I run it
using Kubernetes. I don't know how to make Flink and Kubernetes go well
together in that case.

Best, Wojtek

pt., 24 lip 2020 o 17:51 Xingbo Huang  napisał(a):

> Hi Wojciech,
> In many cases, you can make sure that your code can run correctly in local
> mode, and then submit the job to the cluster for testing. For how to add
> jar packages in local mode, you can refer to the doc[1].
> Besides, you'd better use blink planner which is the default planner. For
> how to use blink planner, you can refer to the doc[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Xingbo
>
> Wojciech Korczyński  于2020年7月24日周五
> 下午9:40写道:
>
>> Hi,
>>
>> I've done like you recommended:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
>> StreamTableEnvironment, ScalarFunction
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, 
>> Json, Csv
>> from pyflink.table.udf import udf
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> INPUT_TABLE = "my_topic"
>> INPUT_TOPIC = "my-topic"
>> LOCAL_KAFKA = 'my-cluster-kafka-bootstrap:9092'
>> OUTPUT_TABLE = "my_topic_output"
>> OUTPUT_TOPIC = "my-topic-output"
>>
>> ddl_source = f"""
>>CREATE TABLE {INPUT_TABLE} (
>>message STRING
>>) WITH (
>>'connector' = 'kafka',
>>'topic' = '{INPUT_TOPIC}',
>>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>'format' = 'json'
>>)
>>"""
>>
>> ddl_sink = f"""
>>CREATE TABLE {OUTPUT_TABLE} (
>>message STRING
>>) WITH (
>>'connector' = 'kafka',
>>'topic' = '{OUTPUT_TOPIC}',
>>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>'format' = 'json'
>>)
>>"""
>>
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> result = t_env.execute_sql(f"""
>> INSERT INTO {OUTPUT_TABLE}
>> SELECT message
>> FROM {INPUT_TABLE}
>> """)
>>
>> result.get_job_client().get_job_execution_result().result()
>>
>> I think it is correctly written.
>>
>> However after deploying that job I'm getting an error:
>>
>> wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
>> /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
>> kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by 
>> org.apache.flink.api.java.ClosureCleaner 
>> (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
>>  to field java.util.Properties.serialVersionUID
>> WARNING: Please consider reporting this to the maintainers of 
>> org.apache.flink.api.java.ClosureCleaner
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>> Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
>> Traceback (most recent call last):
>>   File "kafka2flink.py", line 62, in 
>> result.get_job_client().get_job_execution_result().result()
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
>>  line 78, in result
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>>  line 1286, in __call__
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>>  line 147, in deco
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>>  line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
>> : java.util.concurrent.ExecutionException: 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed 
>> (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>>  at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>>  at 
>> 

Re:flink 聚合 job 重启问题

2020-07-27 Thread RS
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了


在 2020-07-27 11:33:31,"郑斌斌"  写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt 
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks
>
>


Blink的Batch模式的并行度问题

2020-07-27 Thread jun su
hi all,

Flink 目前的blink table planner batch mode
(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
那么如何能扩大并行度来优化性能呢?

-- 
Best,
Jun Su


Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Tom Fennelly
Thank you David.

In the case we have in mind it should only happen literally on the very
rare Exception i.e. in some cases if somehow an uncaught exception occurs,
we want to send the record to a DLQ and handle the retry manually Vs
checkpointing and restarting.

Regards,

Tom.


On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
wrote:

> Every job is required to have a sink, but there's no requirement that all
> output be done via sinks. It's not uncommon, and doesn't have to cause
> problems, to have other operators that do I/O.
>
> What can be problematic, however, is doing blocking I/O. While your user
> function is blocked, the function will exert back pressure, and checkpoint
> barriers will be unable to make any progress. This sometimes leads to
> checkpoint timeouts and job failures. So it's recommended to make any I/O
> you do asynchronous, using an AsyncFunction [1] or something similar.
>
> Note that the asynchronous i/o function stores the records for in-flight
> asynchronous requests in checkpoints, and restores/re-triggers the requests
> when recovering from a failure. This might lead to duplicate results if you
> are using it to do non-idempotent database writes. If you need
> transactions, use a sink that offers them.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> 
>
> Best,
> David
>
> On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly 
> wrote:
>
>> Hi.
>>
>> What are the negative side effects of (for example) a filter function
>> occasionally making a call out to a DB ? Is this a big no-no and should all
>> outputs be done through sinks and side outputs, no exceptions ?
>>
>> Regards,
>>
>> Tom.
>>
>


Re: Blink Planner构造Remote Env

2020-07-27 Thread jun su
是依赖问题,解决了

jun su  于2020年7月27日周一 下午2:29写道:

> hi Jark,
>
> 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
> 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
> 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:
>
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:291)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:126)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:453)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2125)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2245)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2169)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2027)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:576)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:562)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:550)
> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:511)
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:276)
> ... 6 more
>
>
> Jark Wu  于2020年5月20日周三 下午2:30写道:
>
>> Hi,
>>
>> 因为 Blink planner
>> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
>> ExecutionEnvironment。
>> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
>> StreamTableEnvironment,
>> 需要直接去构造 StreamTableEnvironmentImpl:
>>
>> StreamExecutionEnvironment execEnv =
>> StreamExecutionEnvironment.createRemoteEnvironment(...);
>> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
>> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>>
>> Best,
>> Jark
>>
>> On Tue, 19 May 2020 at 15:27, jun su  wrote:
>>
>> > hi all,
>> >
>> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
>> >
>> > 官网Blink构建方式是:
>> >
>> > val bbSettings =
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
>> > val bbTableEnv = TableEnvironment.create(bbSettings)
>> >
>> >
>> > 请问如何连接远程集群呢?
>> >
>> > --
>> > Best,
>> > Jun Su
>> >
>>
>
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su


Re: Is there a way to use stream API with this program?

2020-07-27 Thread Flavio Pompermaier
Yes it could..where should I emit the MAX_WATERMARK and how do I detect
that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
wrote:

> In this use case, couldn't the custom trigger register an event time timer
> for MAX_WATERMARK, which would be triggered when the bounded input reaches
> its end?
>
> David
>
> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I'm afraid that there is not out of the box way of doing this. I've
>> created a ticket [1] to write down and document a discussion that we had
>> about this issue in the past.
>>
>> The issue is that currently, untriggered processing time timers are
>> ignored on end of input and it seems like there might be no one single
>> perfect way to handle it for all of the cases, but it probably needs to be
>> customized.
>>
>> Maybe you could:
>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>> 2. implement
>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>> `MyWindowOperator`
>> 3. Inside `MyWindowOperator#endInput`  invoke
>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>   b) delete manually triggered timer
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
>> napisał(a):
>>
>>> Hi to all,
>>> I was trying to port another job we have that use dataset API to
>>> datastream.
>>> The legacy program was doing basically a dataset.mapPartition().reduce()
>>> so I tried to replicate this thing with a
>>>
>>>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>   final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>> .keyBy(t -> t.f0)
>>> .window(GlobalWindows.create())
>>>
>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>> 100L))).
>>> .process(..)
>>>
>>> Unfortunately the program exits before reaching the Process function
>>> (moreover I need to add another window + trigger after it before adding the
>>> reduce function).
>>> Is there a way to do this with the DataStream API or should I still use
>>> DataSet API for the moment (when the batch will be fully supported)? I
>>> append to the footer all the code required to test the job.
>>>
>>> Best,
>>> Flavio
>>>
>>> -
>>>
>>> package org.apache.flink.stats.sketches;
>>>
>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>> import org.apache.flink.api.common.state.ReducingState;
>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.util.Collector;
>>>
>>> public class Test {
>>>   public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> env.setParallelism(1);
>>>
>>> final BasicTypeInfo columnType =
>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>> final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>> final DataStream out = 

Re: Flink 1.11 submit job timed out

2020-07-27 Thread Yang Wang
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
怀疑还是和GC有关的

env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

SmileSmile  于2020年7月27日周一 下午1:50写道:

> Hi,Yang Wang
>
> 因为日志太长了,删了一些重复的内容。
> 一开始怀疑过jm gc的问题,将jm的内存调整为10g也是一样的情况。
>
> Best
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/27/2020 11:36, Yang Wang wrote:
> 看你这个任务,失败的根本原因并不是“No hostname could be resolved
> ”,这个WARNING的原因可以单独讨论(如果在1.10里面不存在的话)。
> 你可以本地起一个Standalone的集群,也会有这样的WARNING,并不影响正常使用
>
>
> 失败的原因是slot 5分钟申请超时了,你给的日志里面2020-07-23 13:55:45,519到2020-07-23
> 13:58:18,037是空白的,没有进行省略吧?
> 这段时间按理应该是task开始deploy了。在日志里看到了JM->RM的心跳超时,同一个Pod里面的同一个进程通信也超时了
> 所以怀疑JM一直在FullGC,这个需要你确认一下
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月23日周四 下午2:43写道:
>
> > Hi Yang Wang
> >
> > 先分享下我这边的环境版本
> >
> >
> > kubernetes:1.17.4.   CNI: weave
> >
> >
> > 1 2 3 是我的一些疑惑
> >
> > 4 是JM日志
> >
> >
> > 1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup
> >
> > kubectl exec -it busybox2 -- /bin/sh
> > / # nslookup 10.47.96.2
> > Server:  10.96.0.10
> > Address: 10.96.0.10:53
> >
> > ** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN
> >
> >
> >
> > 2. Flink1.11和Flink1.10
> >
> > detail subtasks taskmanagers xxx x 这行
> >
> 1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)
> >
> > 3. coredns是否特殊配置?
> >
> > 在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?
> >
> >
> > 4. time out时候的JM日志如下:
> >
> >
> >
> > 2020-07-23 13:53:00,228 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > ResourceManager akka.tcp://flink@flink-jobmanager
> :6123/user/rpc/resourcemanager_0
> > was granted leadership with fencing token
> 
> > 2020-07-23 13:53:00,232 INFO
> >  org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
> Starting
> > RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> > at akka://flink/user/rpc/dispatcher_1 .
> > 2020-07-23 13:53:00,233 INFO
> >  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl []
> -
> > Starting the SlotManager.
> > 2020-07-23 13:53:03,472 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696
> > (akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> > 2020-07-23 13:53:03,777 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4
> > (akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> > 2020-07-23 13:53:03,787 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4
> > (akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> > 2020-07-23 13:53:04,044 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2
> > (akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> > 2020-07-23 13:53:04,099 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330
> > (akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> > 2020-07-23 13:53:04,146 INFO
> >  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> > Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8
> > (akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at
> > ResourceManager
> >
> >
> > 2020-07-23 13:55:44,220 INFO
> >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> Received
> > JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
> > 2020-07-23 13:55:44,222 INFO
> >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
> > Submitting job 99a030d0e3f428490a501c0132f27a56 (JobTest).
> > 2020-07-23 13:55:44,251 INFO
> >  org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
> Starting
> > RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> > akka://flink/user/rpc/jobmanager_2 .
> > 2020-07-23 13:55:44,260 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> > [] - Initializing job JobTest
> > (99a030d0e3f428490a501c0132f27a56).
> > 2020-07-23 13:55:44,278 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> > [] - Using restart back off time strategy
> > NoRestartBackoffTimeStrategy for JobTest
> (99a030d0e3f428490a501c0132f27a56).
> > 2020-07-23 13:55:44,319 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >

sql-client ??jdbc??????

2020-07-27 Thread op
??jdbc


CREATE TABLE mvp_dim_anticheat_args_all (
  id BIGINT,
  dt STRING,
  cnt_7d INT,
 cnt_30d INT,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'driver'='com.mysql.jdbc.Driver',
 'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
 'table-name' = 'mvp_dim_ll',
 'username' = 'huy_oi',
 'password' = '420123'
);




[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver





lib??flink-connector-jdbc_2.11-1.11.0.jar 
??mysql-connector-java-5.1.38.jar 


Re: Flink Session TM Logs

2020-07-27 Thread Robert Metzger
Hi Richard,
thanks for forwarding my answer to the list!

I fear that Flink does not have a built-in solution for serving the logs of
a finished TaskManager while a YARN session is still running.

I agree with Yangze that you probably have to rely on an external logging
service, such as ElasticSearch or Splunk to index your log events.
Maybe there's also some tooling from Cloudera specifically made for YARN?



On Mon, Jul 27, 2020 at 3:46 AM Yangze Guo  wrote:

> Hi, Richard
>
> Before the session has been terminated, you could not fetch the
> terminated TM logs. One possible solution could be leveraging the
> log4j2 appenders[1]. Flink uses log4j2 as default in the latest
> release 1.11.
>
> [1] https://logging.apache.org/log4j/2.x/manual/appenders.html
>
> Best,
> Yangze Guo
>
> On Sat, Jul 25, 2020 at 2:37 AM Richard Moorhead
>  wrote:
> >
> >
> >
> > -- Forwarded message -
> > From: Robert Metzger 
> > Date: Fri, Jul 24, 2020 at 1:09 PM
> > Subject: Re: Flink Session TM Logs
> > To: Richard Moorhead 
> >
> >
> > I accidentally replied to you directly, not to the user@ mailing list.
> Is it okay for you to publish the thread on the list again?
> >
> >
> >
> > On Fri, Jul 24, 2020 at 8:01 PM Richard Moorhead <
> richard.moorh...@gmail.com> wrote:
> >>
> >> It is enabled. The issue is that for a long running flink session
> -which may execute many jobs- the task managers, after a job is completed,
> are gone, and their logs arent available.
> >>
> >> What I have noticed is that when the session is terminated I am able to
> find the logs in the job history server under the associated yarn
> application id.
> >>
> >> On Fri, Jul 24, 2020 at 12:51 PM Robert Metzger 
> wrote:
> >>>
> >>> Hi Richard,
> >>>
> >>> you need to enable YARN log aggregation to access logs of finished
> YARN applications.
> >>>
> >>> On Fri, Jul 24, 2020 at 5:58 PM Richard Moorhead <
> richard.moorh...@gmail.com> wrote:
> 
>  When running a flink session on YARN, task manager logs for a job are
> not available after completion. How do we locate these logs?
> 
>


Re: Blink Planner构造Remote Env

2020-07-27 Thread jun su
hi Jark,

抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:291)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1563)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2125)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2245)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:276)
... 6 more


Jark Wu  于2020年5月20日周三 下午2:30写道:

> Hi,
>
> 因为 Blink planner
> 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接
> ExecutionEnvironment。
> Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用
> StreamTableEnvironment,
> 需要直接去构造 StreamTableEnvironmentImpl:
>
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.createRemoteEnvironment(...);
> StreamTableEnvironmentImpl tEnv = new StreamTableEnvironmentImpl(..
> execEnv, .., false); // 构造的参数可以参考 StreamTableEnvironmentImpl#create 的实现
>
> Best,
> Jark
>
> On Tue, 19 May 2020 at 15:27, jun su  wrote:
>
> > hi all,
> >
> > 过去在ide中想连接远程flink集群可以 ExecutionEnvironment.createRemoteEnvironment()
> >
> > 官网Blink构建方式是:
> >
> > val bbSettings =
> > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> > val bbTableEnv = TableEnvironment.create(bbSettings)
> >
> >
> > 请问如何连接远程集群呢?
> >
> > --
> > Best,
> > Jun Su
> >
>


-- 
Best,
Jun Su


Re: 【flink sql】flink sql insert into插入语句的问题

2020-07-27 Thread Caizhi Weng
Hi,

Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。

[1] https://issues.apache.org/jira/browse/FLINK-18726

 于2020年7月27日周一 上午11:36写道:

> 测试Flink版本:1.11.0
>
>
>
> Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列
>
> Insert into tableName(col1[,col2]) select col1[,col2]
>
>
>
> 目前通过测试发现了以下问题
>
> 建表语句:
>
> create table t1(a int,b string,c int) with ();
>
> create table t2(a int,b string,c int) with ();
>
>
>
> 问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行
>
> 测试语句:
>
> insert into t2 select t1.a,t1.c, t1.b from  t1;
>
> 报错信息:
>
> org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink default_catalog.default_database.t2 do not match.
>
> Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
>
> Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
>
>
>
>at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
> mplicitCast(TableSinkUtils.scala:100)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:213)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:204)
>
>at scala.Option.map(Option.scala:146)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
> Base.scala:204)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:98)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:80)
>
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:80)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:43)
>
>at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
> nmentImpl.java:632)
>
>
>
> 问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并
> 没有真正起作用,还是按照定义的顺序进行匹配
>
> 测试语句:
>
> insert into t2(a,c,b) select t1.a,t1.c, t1.b from  t1;
>
> 报错信息:
>
>
>
> org.apache.flink.table.api.ValidationException: Field types of query result
> and registered TableSink default_catalog.default_database.t2 do not match.
>
> Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
>
> Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
>
>
>
>at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI
> mplicitCast(TableSinkUtils.scala:100)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:213)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann
> erBase.scala:204)
>
>at scala.Option.map(Option.scala:146)
>
>at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner
> Base.scala:204)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:98)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str
> eamPlanner.scala:80)
>
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
> 234)
>
>at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:80)
>
>at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne
> r.scala:43)
>
>at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro
> nmentImpl.java:632)
>
>
>
> 问题3:当insert into的字段比sink的schema的字段少也会如此
>
> 测试语句:
>
> insert into t2(a,b)
>
> select t1.a, t1.b from t1;
>
> 报错信息:
>
>