回复: (无主题)

2020-07-20 文章 罗显宴


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




Re: (无主题)

2020-07-20 文章 shizk233
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


Re:Re: 官方pyflink 例子的执行问题

2020-07-20 文章 chenxuying
你好
明白了,感谢 , 我文档没看清楚哈

















在 2020-07-21 11:44:23,"Xingbo Huang"  写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
>
>chenxuying  于2020年7月21日周二 上午11:36写道:
>
>> 官方例子:
>> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> 按照例子写了程序,也安装了pyflink
>> |
>> python -m pip install apache-flink
>> |
>> 代码:
>> |
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.udf import udf
>>
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env)
>>
>>
>> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
>> DataTypes.BIGINT())
>>
>>
>> t_env.register_function("add", add)
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
>> \
>> .with_format(OldCsv()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .create_temporary_table('mySource')
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
>> \
>> .with_format(OldCsv()
>> .field('sum', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('sum', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>>
>> t_env.from_path('mySource')\
>> .select("add(a, b)") \
>> .insert_into('mySink')
>>
>>
>> t_env.execute("tutorial_job")
>> |
>>
>> 执行:
>>
>> |
>> python test_pyflink.py
>> |
>>
>> 报错:
>>
>>
>> |
>> Traceback (most recent call last):
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'.
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.T

flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-20 文章 jindy_liu
例如:

mysql表:
CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `time` datetime NOT NULL,
  `status` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `status` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

kafka中数据:
// 表test 中insert事件
{"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}

//表status 中的事件
{"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}

如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。






--
Sent from: http://apache-flink.147419.n8.nabble.com/


想知道state写到checkpoint文件没有

2020-07-20 文章 sun
  请问怎么反编译checkpoint文件啊,我想知道state写到checkpoint文件没有




     _default_        
OPERATOR_STATE_DISTRIBUTION_MODE SPLIT_DISTRIBUTE    
VALUE_SERIALIZER    
Gorg.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfigzS酿 
     脂?sr -org.apache.flink.runtime.state.JavaSerializerFSX韦4
?  xr 
Borg.apache.flink.api.common.typeutils.base.TypeSerializerSingletony﹪.wE 
 xr 4org.apache.flink.api.common.typeutils.TypeSerializer      
   xp    -org.apache.flink.runtime.state.JavaSerializer 
topic-partition-offset-states        
OPERATOR_STATE_DISTRIBUTION_MODE UNION    VALUE_SERIALIZER  
  
Iorg.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshotzS酿 
    矛?sr 
;org.apache.flink.api.java.typeutils.runtime.TupleSerializer    
     xr 
?org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase    
    I arityI length[ fieldSerializerst 
7[Lorg/apache/flink/api/common/typeutils/TypeSerializer;L 
tupleClasst Ljava/lang/Class;xr 
4org.apache.flink.api.common.typeutils.TypeSerializer      
   xp   r 
7[Lorg.apache.flink.api.common.typeutils.TypeSerializer;9?Ч麡  xp  
 sr 
?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer    
    L defaultSerializerClassest Ljava/util/LinkedHashMap;L 
defaultSerializersq ~  L kryoRegistrationsq ~ L registeredTypest 
Ljava/util/LinkedHashSet;L $registeredTypesWithSerializerClassesq ~L 
registeredTypesWithSerializersq ~L typeq ~ xq ~ sr 
java.util.LinkedHashMap4繬\l利 Z accessOrderxr java.util.HashMap诹?`? 
F 
loadFactorI thresholdxp?@      w       
x sq ~ ?@     w       x sq ~ ?@  
   w      t 
Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionsr 


回复: (无主题)

2020-07-20 文章 罗显宴
好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,

CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,

这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:



大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制



Re: 官方pyflink 例子的执行问题

2020-07-20 文章 Xingbo Huang
Hi,
你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)就行,如果你用了的话,就需要配置off-heap
memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')。你可以参考文档上的例子,以及对应的note说明[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions

Best,
Xingbo


chenxuying  于2020年7月21日周二 上午11:36写道:

> 官方例子:
> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
> 按照例子写了程序,也安装了pyflink
> |
> python -m pip install apache-flink
> |
> 代码:
> |
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
>
>
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
> DataTypes.BIGINT())
>
>
> t_env.register_function("add", add)
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
> \
> .with_format(OldCsv()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .create_temporary_table('mySource')
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
> \
> .with_format(OldCsv()
> .field('sum', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('sum', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
>
> t_env.from_path('mySource')\
> .select("add(a, b)") \
> .insert_into('mySink')
>
>
> t_env.execute("tutorial_job")
> |
>
> 执行:
>
> |
> python test_pyflink.py
> |
>
> 报错:
>
>
> |
> Traceback (most recent call last):
>   File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'.
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1

Flink ??kafka??????????????????checkpoint??????????

2020-07-20 文章 ??????
??
??Flink 
??kafka??checkpointEXACTLY_ONCE
>>??
Producer attempted an operation with an old epoch.Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker


>>??
>>??

Re: (无主题)

2020-07-20 文章 shizk233
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


官方pyflink 例子的执行问题

2020-07-20 文章 chenxuying
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)


add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
DataTypes.BIGINT())


t_env.register_function("add", add)


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
 \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
 \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')


t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')


t_env.execute("tutorial_job")
|

执行:

|
python test_pyflink.py
|

报错:


|
Traceback (most recent call last):
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap 
Memory 0 bytes is less than the least required Python worker Memory 79 mb. The 
Task Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 
org.apache.flink

Re: Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

2020-07-20 文章 Jingsong Li
默认创建的是Flink表,Hive端不可见。
你想创建Hive表的话,用Hive dialect。

Best,
Jingsong

On Tue, Jul 21, 2020 at 11:31 AM felixzh  wrote:

> 参照文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
> 通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
> 在beeline端可见表,但是desc看不到字段,select * from orders也不可用
>
>

-- 
Best, Jingsong Lee


Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

2020-07-20 文章 felixzh
参照文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
在beeline端可见表,但是desc看不到字段,select * from orders也不可用



回复: (无主题)

2020-07-20 文章 罗显宴
hi,
CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


?????? pyflink1.11.0window

2020-07-20 文章 ??????????????
??
    
 ??pyflinkdemo??





--  --
??: 
   "user-zh"



Re: (无主题)

2020-07-20 文章 shizk233
Hi,

我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。

你可以让acc做个累加,然后结果输出里把acc的值带上看看。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:

>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
> 不好意思,刚才发的快,没来得及解释,
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


Re: Flink Cli 部署问题

2020-07-20 文章 Congxian Qiu
Hi  Z-Z
这种情况比较奇怪的。你这个是稳定复现的吗?能否分享一个稳定复现的作业代码,以及相关步骤呢?我尝试本地复现一下


Best,
Congxian


Z-Z  于2020年7月20日周一 下午4:17写道:

> 通过 cli 命令是 在jobmanager目录 执行 bin/flink run -d -p 1 -s {savepointuri}
> /data/test.jar  >这种会报莫名其妙的错误,如之前的邮件
> 通过webui就是在http://jobmanager:8081 ; submit new 
> job里添加jar包,指定相同的savepoint path和并行度提交任务 > 这样操作就没问题
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com>;
> 发送时间: 2020年7月20日(星期一) 下午2:30
> 收件人: "user-zh"
> 主题: Re: Flink Cli 部署问题
>
>
>
> Hi
>
> 这个调试可以在 IDEA 进行的。
>
> 另外你说的通过 web ui 提交没有问题。请问下,是同一个 savepoint 通过 flink run 提交有问题,通过 web ui
> 提交没有问题吗?如果是的,能否分享下你的操作过程和命令呢?
>
> Best,
> Congxian
>
>
> Z-Z 
> > 这是taskmanager新报的一个错,还是跟之前一样,用cli提交报错,用webui提交就没问题:
> > 2020-07-20 03:29:25,959 WARN 
> > org.apache.kafka.clients.consumer.ConsumerConfig   
>    
> >       - The configuration
> 'value.serializer' was supplied
> > but isn't a known config.
> > 2020-07-20 03:29:25,959 INFO 
> > org.apache.kafka.common.utils.AppInfoParser   
>    
> >            -
> Kafka version : 0.11.0.2
> > 2020-07-20 03:29:25,959 INFO 
> > org.apache.kafka.common.utils.AppInfoParser   
>    
> >            -
> Kafka commitId : 73be1e1168f91ee2
> > 2020-07-20 03:29:25,974 ERROR
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder 
> > - Caught unexpected exception.
> > java.lang.ArrayIndexOutOfBoundsException: 0
> > at
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
> > at
> >
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
> > at
> >
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
> > at
> >
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> > at
> >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> > at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > at
> java.lang.Thread.run(Thread.java:748)
> > 2020-07-20 03:29:25,974 WARN 
> >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure 
> -
> > Exception while restoring keyed state backend for
> > StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative
> (1/1),
> > will retry while more alternatives are available.
> > org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected
> > exception.
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> > at
> >
> org.apache.flink.str

Re: liststate

2020-07-20 文章 Congxian Qiu
Hi

ListState 中的 value 是一个整体,所以一次性会取回来,现在 RocksDBMapStat 中有一些操作
(iterator/entries/values 等)是使用 next 方法加载的。

Best,
Congxian


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年7月21日周二 上午9:28写道:

> 大家好:
>    
>  我发现RocksDBListState#get方法是一次性把数据全加载到内存中,如果list太大这样会不会造成内存溢出。可不可以在next方法中才加载数据?


Re: flink1.11 run

2020-07-20 文章 Rui Li
可以写已有的表,相关的配置 [1] 需要添加到表的property当中。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing

On Mon, Jul 20, 2020 at 5:14 PM Dream-底限  wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li  于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > environment.setStateBackend(new MemoryStateBackend());
> > > environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > > String name = "myhive";
> > > String defaultDatabase = "tmp";
> > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > String version = "1.1.0";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > tableEnv.registerCatalog("myhive", hive);
> > > tableEnv.useCatalog("myhive");
> > >
> > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > "  user_id BIGINT,\n" +
> > > "  item_id STRING,\n" +
> > > "  behavior STRING,\n" +
> > > "  ts AS PROCTIME()\n" +
> > > ") WITH (\n" +
> > > " 'connector' = 'kafka-0.11',\n" +
> > > " 'topic' = 'user_behavior',\n" +
> > > " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > > " 'properties.group.id' = 'testGroup',\n" +
> > > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > " 'format' = 'json',\n" +
> > > " 'json.fail-on-missing-field' = 'false',\n" +
> > > " 'json.ignore-parse-errors' = 'true'\n" +
> > > ")");
> > >
> > > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //

????: ?????? pyflink1.11.0window

2020-07-20 文章 chengyanan1...@foxmail.com
Hi??Sinkinsertinsert 
??
??AppendStreamTableSink requires that Table has only insert changes.??



chengyanan1...@foxmail.com
 
 ??
?? 2020-07-20 16:23
 user-zh
?? ?? pyflink1.11.0window
HI ??
    
??kafka
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, 
in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
: org.apache.flink.table.api.TableException: AppendStreamTableSink requires 
that Table has only insert changes.
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
 
 
 
 
During handling of the above exception, another exception occurred:
 
 
Traceback (most recent call last):
  File "tou.py", line 99, in 

?????? flink1.11????????

2020-07-20 文章 Evan
Hi,??jar??1??45jar




--  --
??: 
   "user-zh"



liststate

2020-07-20 文章 ??????(Jiacheng Jiang)

    
 ??RocksDBListState#get??list??next??

Re: flink1.11启动问题

2020-07-20 文章 Leonard Xu
Hi,

> 在 2020年7月20日,20:15,酷酷的浑蛋  写道:
> 
> 1. flink1.11解压后,启动会报:
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

第一步就报错,应该是你本地环境问题,后面加的包应该都是不需要的,你本地用的JDK版本是多少呀?

祝好
Leonard Xu

回复: (无主题)

2020-07-20 文章 罗显宴


大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制
package com.bupt.main

import java.sql.Timestamp
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.{AggregateFunction, 
FlatMapFunction, MapFunction, RuntimeContext}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests



case class Drug(id:String,ATCCode:String,MIMSType:String,
name:String,other:String, producer:String,
retailPrice:String,composition:String,
medicineRank:String, timestamp:Long)

case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long)

case class ItemViewCount(category:String,windowEnd:Long)

case class IncreasePerHour(time:String,num:Long)

object KafkaToElasticSearch {
  def main(args: Array[String]): Unit = {
// 1. 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties
props.put("bootstrap.servers", "master:9092")
props.put("zookeeper.connect", "master:2181")
props.put("group.id", "drug-group")
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")

val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", 
//这个 kafka topic 需要和上面的工具类的 topic 一致
  new SimpleStringSchema, props)).setParallelism(1)
  .filter(string =>{
val words = string.split(",")
words.length == 10 && words(2).length!=0
  })
  .map(new MapFunction[String,Drug] {
  override def map(value: String): Drug = {
  val words = value.split(",")
if(words.length!=10)
println(words)
  
Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong)
  }
} ).assignAscendingTimestamps( _.timestamp )
//drugs.print()
val num = drugs.map(drug => {
  var temp: StringBuilder = new StringBuilder(drug.MIMSType)

  if (temp.length != 0 && temp.charAt(0) == '-')
temp.deleteCharAt(0)
  if (temp.length != 0 && temp.charAt(temp.length - 1) == ';')
temp.deleteCharAt(temp.length - 1)
  var validateResult: String = null
  if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' 
'))
  IncreaseNumPerHour(validateResult, 1l, drug.timestamp)
})

//num.print()
//
val result = num.keyBy(_.category)
  .timeWindow(Time.hours(1))
  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
  .aggregate(new CountAgg,new WindowResult)
  .keyBy(_.windowEnd)
  .process(new IncreaseItems)
result.print()

env.execute("compute num")
  }

}

// 自定义预聚合函数

//in、acc、out
//在这里面,累加器相当于是state
class CountAgg() extends AggregateFunct

Re: pyflink1.11.0window

2020-07-20 文章 Shuiqiang Chen
看看异常信息, 是不是你的insert mode没配置对。
BTW, 你粘贴的文本带有很多" ", 有点影响可读性。

Best,
Shuiqiang

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道:

> HI :
>     我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下
> Traceback (most recent call last):
>   File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 147, in deco
>     return f(*a, **kw)
>   File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
> : org.apache.flink.table.api.TableException: AppendStreamTableSink
> requires that Table has only insert changes.
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>         at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at
> java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> During handling of the above exception, another exception occurred:
>
>
> Traceback (most recent call last):
>   File "tou.py", line 99, in      from_kafka_to_kafka_demo()
>   File "tou.py", line 33, in from_kafka_to_kafka_demo
>     st_env.sql_update("insert into flink_result select
> id,type,rowtime from final_result2")
>   File
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
> line 547, in sql_update
>     self._j_tenv.sqlUpdate(stmt)
>   File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 154, in deco
>     raise exception_mapping[exception](s.split(': ', 1)[1],
> stack_trace)
> pyflink.util.exceptions.TableException: 'AppendStreamTableSink requires
> that Table has only insert changes.'
>
>
>
>
>
> 这种应该如何实现,需求大概是一个流表(需要分组汇总)关联一个维表。
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
> from pyflink.table.window import Tumble 
>
>
>
>
> def from_kafka_to_kafka_demo():
>
>
>     # use blink table planner
>     env = StreamExecutionEnviro

回复: (无主题)

2020-07-20 文章 罗显宴
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复: (无主题)

2020-07-20 文章 罗显宴
 我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复: flink1.11启动问题

2020-07-20 文章 酷酷的浑蛋
1. flink1.11解压后,启动会报:
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
然后将jersey-client-1.9.jar、jersey-core-1.9.jar复制到lib下
2. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader
然后将javax.ws.rs-api-2.1.1.jar复制到lib下
3. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: 
org.glassfish.jersey.internal.RuntimeDelegateImpl
将jersey-common-3.0.0-M6.jar复制到lib下
4. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: jakarta.ws.rs.ext.RuntimeDelegate
将jakarta.ws.rs-api-3.0.0-M1.jar复制到lib下
5. 再次启动,报:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
6. 我崩溃了




在2020年07月20日 20:06,Leonard Xu 写道:
Hi,
看起来像是依赖冲突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar 
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查

祝好,
Leonard Xu

在 2020年7月20日,15:36,酷酷的浑蛋  写道:



Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
at com.sun.jersey.api.client.Client.(Client.java:170)
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.missfresh.Main.main(Main.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)


我已经在lib下添加了javax.ws.rs-api-2.1.1.jar



回复:(无主题)

2020-07-20 文章 罗显宴
好的,谢谢大佬,我用这个试试


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月20日 15:11,shizk233 写道:
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


Re: flink1.11启动问题

2020-07-20 文章 Leonard Xu
Hi,
看起来像是依赖冲突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar 
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查

祝好,
Leonard Xu

> 在 2020年7月20日,15:36,酷酷的浑蛋  写道:
> 
> 
> 
> Flink1.11启动时报错:
> java.lang.LinkageError: ClassCastException: attempting to 
> castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>  to 
> jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>at 
> javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
>at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
>at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
>at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>at com.sun.jersey.api.client.Client.init(Client.java:342)
>at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>at com.sun.jersey.api.client.Client.(Client.java:187)
>at com.sun.jersey.api.client.Client.(Client.java:170)
>at 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
>at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
>at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
>at 
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
>at com.missfresh.Main.main(Main.java:142)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> 
> 
> 我已经在lib下添加了javax.ws.rs-api-2.1.1.jar
> 



????: ????????????flink????????????????

2020-07-20 文章 sjlsumait...@163.com
??



sjlsumait...@163.com
 
 jiafu
?? 2020-07-20 19:31
 user-zh
?? flink
flinkflink-1.8.1
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to 
eagerly schedule a task whose inputs are not ready (result type: 
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED, 
producer slot: null). at 
org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:145)
 at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:840)
 at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:621) at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
 at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:436)
 at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:637)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:229)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:186)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:96)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:146)
 at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$releaseAssignedResource$11(Execution.java:1350)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1345)
 at 
org.apache.flink.runtime.executiongraph.Execution.finishCancellation(Execution.java:1115)
 at 
org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1094)
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1628)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:517)
 at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) 
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
 at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


????????????flink????????????????

2020-07-20 文章 jiafu
flinkflink-1.8.1
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to 
eagerly schedule a task whose inputs are not ready (result type: 
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED, 
producer slot: null).at 
org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:145)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:840)
 at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:621)
 at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
 at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)  
 at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)  at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:436)
   at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:637)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:229)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:186)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:96)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:146)
 at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
 at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.executiongraph.Execution.lambda$releaseAssignedResource$11(Execution.java:1350)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
  at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1345)
   at 
org.apache.flink.runtime.executiongraph.Execution.finishCancellation(Execution.java:1115)
at 
org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1094)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1628)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:517)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) 
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)  
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234)at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

测试一下社区邮件

2020-07-20 文章 sjlsumait...@163.com
忽略



sjlsumait...@163.com
 


Re: flink1.11 run

2020-07-20 文章 Jingsong Li
是的。

但是不管怎么滚动,最终都是checkpoint完成后文件才可见

On Mon, Jul 20, 2020 at 7:10 PM Dream-底限  wrote:

> hi、
> 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
>
> 》可以,默认下 128MB 滚动,Checkpoint 滚动
>
> Jingsong Li  于2020年7月20日周一 下午6:12写道:
>
> > Hi Dream,
> >
> > > 1.一定要在flink内部先建立hive表吗?
> >
> > 不用,哪边建无所谓
> >
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > 可以,默认下 128MB 滚动,Checkpoint 滚动。
> >
> > Best,
> > Jingsong
> >
> > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
> >
> > >  hi
> > > 好的,想问一下stream写hive表的时候:
> > > 1、一定要在flink内部先建立hive表吗?
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> > >
> > > Rui Li  于2020年7月20日周一 下午4:44写道:
> > >
> > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > > >
> > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 
> wrote:
> > > >
> > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > > >
> > > > > 异常:
> > > > > The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > method
> > > > > caused an error: No operators defined in streaming topology. Cannot
> > > > > generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > > at
> > > >
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > > streaming topology. Cannot generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > > ... 11 more
> > > > > 代码:
> > > > >
> > > > >  StreamExecutionEnvironment environment =
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > EnvironmentSettings settings =
> > > > >
> > > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > > StreamTableEnvironment tableEnv =
> > > > > StreamTableEnvironment.create(environment, settings);
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > > > environment.setStateBackend(new MemoryStateBackend());
> > > > >
> >  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > > >
> > > > > String name = "myhive";
> > > > > String defaultDatabase = "tmp";
> > > > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > > > String version = "1.1.0";
> > > > >
> > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > > hiveConfDir, version);
> > > > > tableEnv.registerCatalog("myhive", hive);
> > > > > tableEnv.useCatalog("myhive");
> > > > >
> > > > >

Re: flink1.11 run

2020-07-20 文章 Dream-底限
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件

》可以,默认下 128MB 滚动,Checkpoint 滚动

Jingsong Li  于2020年7月20日周一 下午6:12写道:

> Hi Dream,
>
> > 1.一定要在flink内部先建立hive表吗?
>
> 不用,哪边建无所谓
>
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> 可以,默认下 128MB 滚动,Checkpoint 滚动。
>
> Best,
> Jingsong
>
> On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
>
> >  hi
> > 好的,想问一下stream写hive表的时候:
> > 1、一定要在flink内部先建立hive表吗?
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > Rui Li  于2020年7月20日周一 下午4:44写道:
> >
> > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > >
> > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> > >
> > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > >
> > > > 异常:
> > > > The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method
> > > > caused an error: No operators defined in streaming topology. Cannot
> > > > generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > at
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > streaming topology. Cannot generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > at
> > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > ... 11 more
> > > > 代码:
> > > >
> > > >  StreamExecutionEnvironment environment =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > EnvironmentSettings settings =
> > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > StreamTableEnvironment tableEnv =
> > > > StreamTableEnvironment.create(environment, settings);
> > > >
> > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > > environment.setStateBackend(new MemoryStateBackend());
> > > >
>  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > >
> > > > String name = "myhive";
> > > > String defaultDatabase = "tmp";
> > > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > > String version = "1.1.0";
> > > >
> > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > hiveConfDir, version);
> > > > tableEnv.registerCatalog("myhive", hive);
> > > > tableEnv.useCatalog("myhive");
> > > >
> > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > > "  user_id BIGINT,\n" +
> > > > "  item_id STRING,\n" +
> > > > "  behavior STRING,\n" +
> > > > "  ts AS PROCTIME()\n" +
> > > > ") WITH (\n" +
> > > > " 'connector' = 'kafka-0.11',\n" +
> > > > " 'topic' = 'user_behavior',\n" +
> > > > " 'properties.bootstrap.servers' 

flink1.11启动问题

2020-07-20 文章 酷酷的浑蛋


这flink1.11啥情况啊,一启动就报
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class


jersey..jar   javax.ws.xxx.jar我都放到lib了,怎么还是不行啊?这报的什么鬼东西?



?????? state??????checkpoint??????

2020-07-20 文章 sun
JM checkpoint ??


18:08:07.615 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 116 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 13 ms).
18:08:08.615 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering 
checkpoint 117 @ 1595239688615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:08.626 [flink-akka.actor.default-dispatcher-420] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 117 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 11 ms).
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched from state RUNNING to CANCELLING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from RUNNING to 
CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from RUNNING to CANCELING.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from CANCELING to CANCELED.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - 
map_sub_order_detail -> Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-416] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched fr

Re: flink1.11 run

2020-07-20 文章 Jingsong Li
Hi Dream,

> 1.一定要在flink内部先建立hive表吗?

不用,哪边建无所谓

> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

可以,默认下 128MB 滚动,Checkpoint 滚动。

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li  于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > environment.setStateBackend(new MemoryStateBackend());
> > > environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > > String name = "myhive";
> > > String defaultDatabase = "tmp";
> > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > String version = "1.1.0";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > tableEnv.registerCatalog("myhive", hive);
> > > tableEnv.useCatalog("myhive");
> > >
> > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > "  user_id BIGINT,\n" +
> > > "  item_id STRING,\n" +
> > > "  behavior STRING,\n" +
> > > "  ts AS PROCTIME()\n" +
> > > ") WITH (\n" +
> > > " 'connector' = 'kafka-0.11',\n" +
> > > " 'topic' = 'user_behavior',\n" +
> > > " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > > " 'properties.group.id' = 'testGroup',\n" +
> > > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > " 'format' = 'json',\n" +
> > > " 'json.fail-on-missing-field' = 'false',\n" +
> > > " 'json.ignore-parse-errors' = 'true'\n" +
> > > ")");
> > >
> > > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //" user

Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 文章 Jingsong Li
Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限  wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸  于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775...@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >public static void main(String[] arg) throws Exception{
> > >StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >String name= "myhive";
> > >String defaultDatabase = "default";
> > >String hiveConfDir =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >String version = "3.1.2";
> > >
> > >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >bsTableEnv.registerCatalog("myhive", hive);
> > >bsTableEnv.useCatalog("myhive");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)," +
> > >"  create_time TIMESTAMP " +
> > >") WITH (" +
> > >" 'connector' = 'kafka'," +
> > >" 'topic' = 'order.test'," +
> > >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >" 'properties.group.id' = 'testGroup'," +
> > >" 'scan.startup.mode' = 'earliest-offset', " +
> > >" 'format' = 'json'  " +
> > >")");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)" +
> > >"  )");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >"id, " +
> > >"order_id, " +
> > >"amount " +
> > >"FROM topic_products");
> > >
> > >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >table1.executeInsert("print_table");
> > >}
> > > }
> > >
> >
>


-- 
Best, Jingsong Lee


Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 文章 Dream-底限
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸  于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >public static void main(String[] arg) throws Exception{
> >StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >String name= "myhive";
> >String defaultDatabase = "default";
> >String hiveConfDir =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >String version = "3.1.2";
> >
> >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >bsTableEnv.registerCatalog("myhive", hive);
> >bsTableEnv.useCatalog("myhive");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)," +
> >"  create_time TIMESTAMP " +
> >") WITH (" +
> >" 'connector' = 'kafka'," +
> >" 'topic' = 'order.test'," +
> >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> >" 'properties.group.id' = 'testGroup'," +
> >" 'scan.startup.mode' = 'earliest-offset', " +
> >" 'format' = 'json'  " +
> >")");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)" +
> >"  )");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >"id, " +
> >"order_id, " +
> >"amount " +
> >"FROM topic_products");
> >
> >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >table1.executeInsert("print_table");
> >}
> > }
> >
>


Re: flink1.11 run

2020-07-20 文章 Dream-底限
 hi
好的,想问一下stream写hive表的时候:
1、一定要在flink内部先建立hive表吗?
2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

Rui Li  于2020年7月20日周一 下午4:44写道:

> tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
>
> On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
>
> > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> >
> > 异常:
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: No operators defined in streaming topology. Cannot
> > generate StreamGraph.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot generate StreamGraph.
> > at
> >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > at
> >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> > 代码:
> >
> >  StreamExecutionEnvironment environment =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(environment, settings);
> >
> >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > environment.setStateBackend(new MemoryStateBackend());
> > environment.getCheckpointConfig().setCheckpointInterval(5000);
> >
> > String name = "myhive";
> > String defaultDatabase = "tmp";
> > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > String version = "1.1.0";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > tableEnv.registerCatalog("myhive", hive);
> > tableEnv.useCatalog("myhive");
> >
> > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > "  user_id BIGINT,\n" +
> > "  item_id STRING,\n" +
> > "  behavior STRING,\n" +
> > "  ts AS PROCTIME()\n" +
> > ") WITH (\n" +
> > " 'connector' = 'kafka-0.11',\n" +
> > " 'topic' = 'user_behavior',\n" +
> > " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> > " 'properties.group.id' = 'testGroup',\n" +
> > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > " 'format' = 'json',\n" +
> > " 'json.fail-on-missing-field' = 'false',\n" +
> > " 'json.ignore-parse-errors' = 'true'\n" +
> > ")");
> >
> > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > //" user_id BIGINT,\n" +
> > //" item_id STRING,\n" +
> > //" behavior STRING,\n" +
> > //" tsdata STRING\n" +
> > //") WITH (\n" +
> > //" 'connector' = 'print'\n" +
> > //")");
> > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > " user_id BIGINT,\n" +
> > " item_id STRING,\n" +

Re: flink1.11 run

2020-07-20 文章 Rui Li
tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈

On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:

> hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
>
> 异常:
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: No operators defined in streaming topology. Cannot
> generate StreamGraph.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot generate StreamGraph.
> at
>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> at
>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> at
>
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
> 代码:
>
>  StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(environment, settings);
>
>
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> environment.setStateBackend(new MemoryStateBackend());
> environment.getCheckpointConfig().setCheckpointInterval(5000);
>
> String name = "myhive";
> String defaultDatabase = "tmp";
> String hiveConfDir = "/etc/alternatives/hive-conf/";
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
> tableEnv.registerCatalog("myhive", hive);
> tableEnv.useCatalog("myhive");
>
> tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> "  user_id BIGINT,\n" +
> "  item_id STRING,\n" +
> "  behavior STRING,\n" +
> "  ts AS PROCTIME()\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka-0.11',\n" +
> " 'topic' = 'user_behavior',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'scan.startup.mode' = 'earliest-offset',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")");
>
> //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> //" user_id BIGINT,\n" +
> //" item_id STRING,\n" +
> //" behavior STRING,\n" +
> //" tsdata STRING\n" +
> //") WITH (\n" +
> //" 'connector' = 'print'\n" +
> //")");
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> " user_id BIGINT,\n" +
> " item_id STRING,\n" +
> " behavior STRING,\n" +
> " tsdata STRING\n" +
> ") STORED AS parquet TBLPROPERTIES (\n" +
> " 'sink.rolling-policy.file-size' = '12MB',\n" +
> " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
> " 'sink.rolling-policy.check-interval' = '1 min',\n" +
> " 'execution.checkpoi

flink1.11 run

2020-07-20 文章 Dream-底限
hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:

异常:
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at
com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
代码:

 StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(environment, settings);


environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.setStateBackend(new MemoryStateBackend());
environment.getCheckpointConfig().setCheckpointInterval(5000);

String name = "myhive";
String defaultDatabase = "tmp";
String hiveConfDir = "/etc/alternatives/hive-conf/";
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");

tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
"  user_id BIGINT,\n" +
"  item_id STRING,\n" +
"  behavior STRING,\n" +
"  ts AS PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'kafka-0.11',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");

//tableEnv.executeSql("CREATE TABLE print_table (\n" +
//" user_id BIGINT,\n" +
//" item_id STRING,\n" +
//" behavior STRING,\n" +
//" tsdata STRING\n" +
//") WITH (\n" +
//" 'connector' = 'print'\n" +
//")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
" user_id BIGINT,\n" +
" item_id STRING,\n" +
" behavior STRING,\n" +
" tsdata STRING\n" +
") STORED AS parquet TBLPROPERTIES (\n" +
" 'sink.rolling-policy.file-size' = '12MB',\n" +
" 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
" 'sink.rolling-policy.check-interval' = '1 min',\n" +
" 'execution.checkpointing.interval' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into streamhivetest select
user_id,item_id,behavior,DATE_FORMAT(ts, '-MM-dd') as tsdata from
user_behavior");

tableEnv.execute("stream-write-hive");


Re: flink job 跑一段时间 watermark 不推进的问题

2020-07-20 文章 shizk233
Hi,

Flink
metrics里有一项是task相关的指标currentWatermark,从中可以知道subtask_index,task_name,watermark三项信息,应该能帮助排查watermark的推进情况。

Best,
shizk233

snack white  于2020年7月20日周一 下午3:51写道:

> HI:
>   flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition
> 均有数据, flink job statue backend 为 memory 。有debug 的姿势推荐吗?  看过 CPU GC
> 等指标,看不出来有异常。
>
> Best regards!
> white
>
>


?????? pyflink1.11.0window

2020-07-20 文章 ??????????????
HI ??
    
??kafka
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, 
in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
: org.apache.flink.table.api.TableException: AppendStreamTableSink requires 
that Table has only insert changes.
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)




During handling of the above exception, another exception occurred:


Traceback (most recent call last):
  File "tou.py", line 99, in 

?????? Flink Cli ????????

2020-07-20 文章 Z-Z
 cli ?? ??jobmanager  bin/flink run -d -p 1 -s {savepointuri} 
/data/test.jar  >
webui??http://jobmanager:8081 ; submit new  
job??jar??savepoint path > 





--  --
??: 
   "user-zh"



??????flink job ?????????? watermark ????????????

2020-07-20 文章 Cayden chen
??idea??local??




--  --
??: 
   "user-zh"



flink job 跑一段时间 watermark 不推进的问题

2020-07-20 文章 snack white
HI: 
  flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition 
均有数据, flink job statue backend 为 memory 。有debug 的姿势推荐吗?  看过 CPU GC 等指标,看不出来有异常。 

Best regards!
white



flink1.11启动问题

2020-07-20 文章 酷酷的浑蛋


Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at 
javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
at com.sun.jersey.api.client.Client.(Client.java:170)
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.missfresh.Main.main(Main.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)


我已经在lib下添加了javax.ws.rs-api-2.1.1.jar



Re: SQL 报错只有 flink runtime 的 NPE

2020-07-20 文章 godfrey he
看不到图片信息,换一个图床工具上传图片吧

Luan Cooper  于2020年7月17日周五 下午4:11写道:

> 附一个 Job Graph 信息,在 Cal 处挂了
> [image: image.png]
>
> On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper  wrote:
>
>> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
>> *是指 UDF 返回了 NULL 导致的吗?*
>>
>>
>> On Fri, Jul 17, 2020 at 2:54 PM godfrey he  wrote:
>>
>>> udf_xxx的逻辑是啥?
>>>
>>>
>>> Luan Cooper  于2020年7月17日周五 下午2:40写道:
>>>
>>> > Hi
>>> >
>>> > 我有这么一个 SQL
>>> > INSERT INTO es
>>> > SELECT
>>> > a,
>>> > udf_xxx(b)
>>> > FROM mongo_oplog -- 自定义 TableFactory
>>> >
>>> > Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码
>>> Exception,可以稳定重现
>>> >
>>> > LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
>>> > (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>>> >
>>> > java.lang.NullPointerException
>>> >
>>> > at StreamExecCalc$8016.split$7938$(Unknown Source)
>>> >
>>> > at StreamExecCalc$8016.processElement(Unknown Source)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> >
>>> > at java.lang.Thread.run(Thread.java:748)
>>> >
>>> > 请问这种怎样情况排查问题?
>>> > 有任何线索都可以
>>> >
>>> > 感谢
>>> >
>>>
>>


Re: (无主题)

2020-07-20 文章 shizk233
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>