Re:Re: flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 文章
多谢大佬呀~尝试了一下没有解决。这两个参数有配置上,启动的时候也显示的与配置中一致。看上面的注释说好像仅Standalone 
模式下有效,而且奇怪的是pre-job可以很顺利  session却连不上。对啦我的版本是1.11.2,大佬有空再帮忙看一眼呀











在 2021-03-23 09:28:20,"wxpcc"  写道:
>第一个问题可以尝试在flink.conf 中配上jobmanager.rpc.address 和jobmanager.rpc.port
>第二个问题不是很清楚
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 文章
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?






 

flink on yarn session模式与yarn通信失败 (job模式可以成功)的问题

2021-03-22 文章
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?



pyflink1.11 window groupby出错

2020-09-29 文章
各位大佬,我想尝试下pyflink 进行时间窗口下的指标统计,写了一个demo发现table APi 的group 
方法报错,网上搜索了一下相关内容也没有解决问题, 想请各位大佬帮帮忙看一下是哪里写错了?

错误信息:
py4j.protocol.Py4JJavaError: An error occurred while calling o95.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time 
attribute for grouping in a stream environment.
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at 
org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)




demo程序:
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.descriptors import *
from pyflink.table.descriptors import Json
from pyflink.table.window import *

test_out_put_data_path = r'D:\test_doc\test_result_data.csv'

s_nev = StreamExecutionEnvironment.get_execution_environment()
s_nev.set_parallelism(3)
st_nev = StreamTableEnvironment.create(s_nev, 
environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())

st_nev.connect(Kafka().version('0.11').topic('gyhWebLog').start_from_earliest().property("zookeeper.connect","cdh3:2181,
 cdh4:2181, cdh5:2181").property("bootstrap.servers", "cdh3:9092, cdh4:9092, 
cdh5:9092")) \
.with_format(Json()
 .fail_on_missing_field(False)
 .schema(DataTypes.ROW([DataTypes.FIELD('time', 
DataTypes.TIMESTAMP(3)),

DataTypes.FIELD('prev_page',DataTypes.STRING()),
DataTypes.FIELD('page', 
DataTypes.STRING()),
DataTypes.FIELD("app", 
DataTypes.STRING()),

DataTypes.FIELD("nextApp",DataTypes.STRING()),

DataTypes.FIELD("service",DataTypes.STRING()),

DataTypes.FIELD("userId",DataTypes.BIGINT())])))\
.with_schema(Schema().
field('prev_page', DataTypes.STRING())
 .field('page', DataTypes.STRING())
 .field('app', DataTypes.STRING())
 .field('nextApp', DataTypes.STRING())
 .field('service', DataTypes.STRING())
 .field('userId', DataTypes.BIGINT())
 .field('time', DataTypes.TIMESTAMP(3))
 .rowtime(Rowtime()
  .timestamps_from_field('time')
  .watermarks_periodic_bounded(6)))\
.in_append_mode()\
.create_temporary_table('raw_web_log_data')


st_nev.connect(FileSystem().path(test_out_put_data_path))\
.with_format(OldCsv()
 .field_delimiter(',')
 .field("userId", DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.with_schema(Schema()
 .field('userId', DataTypes.BIGINT())
 .field('dataCount', DataTypes.BIGINT())
 .field('count_time', DataTypes.TIMESTAMP(3))
 )\
.create_temporary_table('test_out_put')


if __name__ == '__main__':
st_nev.from_path('raw_web_log_data').window(Tumble.over('1.hours').on('time').alias('w')).group_by('userId,
 w').select('userId, page.count as d, w.end').execute_insert('test_out_put')