Re: (无主题)

2020-07-20 Thread shizk233
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


Re:Re: 官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
你好
明白了,感谢 , 我文档没看清楚哈

















在 2020-07-21 11:44:23,"Xingbo Huang"  写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
>
>chenxuying  于2020年7月21日周二 上午11:36写道:
>
>> 官方例子:
>> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> 按照例子写了程序,也安装了pyflink
>> |
>> python -m pip install apache-flink
>> |
>> 代码:
>> |
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.udf import udf
>>
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env)
>>
>>
>> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
>> DataTypes.BIGINT())
>>
>>
>> t_env.register_function("add", add)
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
>> \
>> .with_format(OldCsv()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .create_temporary_table('mySource')
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
>> \
>> .with_format(OldCsv()
>> .field('sum', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('sum', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>>
>> t_env.from_path('mySource')\
>> .select("add(a, b)") \
>> .insert_into('mySink')
>>
>>
>> t_env.execute("tutorial_job")
>> |
>>
>> 执行:
>>
>> |
>> python test_pyflink.py
>> |
>>
>> 报错:
>>
>>
>> |
>> Traceback (most recent call last):
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'.
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> 

flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-20 Thread jindy_liu
例如:

mysql表:
CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `time` datetime NOT NULL,
  `status` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `status` (
  `id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

kafka中数据:
// 表test 中insert事件
{"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}

//表status 中的事件
{"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}

如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。






--
Sent from: http://apache-flink.147419.n8.nabble.com/


想知道state写到checkpoint文件没有

2020-07-20 Thread sun
 请问怎么反编译checkpoint文件啊,我想知道state写到checkpoint文件没有




    _default_   
OPERATOR_STATE_DISTRIBUTION_MODE SPLIT_DISTRIBUTE  
VALUE_SERIALIZER  
Gorg.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfigzS酿
  脂?sr -org.apache.flink.runtime.state.JavaSerializerFSX韦4
? xr 
Borg.apache.flink.api.common.typeutils.base.TypeSerializerSingletony﹪.wE
 xr 4org.apache.flink.api.common.typeutils.TypeSerializer   
 xp  -org.apache.flink.runtime.state.JavaSerializer 
topic-partition-offset-states   
OPERATOR_STATE_DISTRIBUTION_MODE UNION  VALUE_SERIALIZER 
 
Iorg.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshotzS酿
  矛?sr 
;org.apache.flink.api.java.typeutils.runtime.TupleSerializer  
  xr 
?org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase  
  I arityI length[ fieldSerializerst 
7[Lorg/apache/flink/api/common/typeutils/TypeSerializer;L
tupleClasst Ljava/lang/Class;xr 
4org.apache.flink.api.common.typeutils.TypeSerializer   
 xp r 
7[Lorg.apache.flink.api.common.typeutils.TypeSerializer;9?Ч麡 xp 
sr 
?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer  
  L defaultSerializerClassest Ljava/util/LinkedHashMap;L 
defaultSerializersq ~  L kryoRegistrationsq ~ L registeredTypest 
Ljava/util/LinkedHashSet;L $registeredTypesWithSerializerClassesq ~L 
registeredTypesWithSerializersq ~L typeq ~ xq ~ sr 
java.util.LinkedHashMap4繬\l利 Z accessOrderxr java.util.HashMap诹?`? 
F
loadFactorI thresholdxp?@   w   
x sq ~ ?@  w   x sq ~ ?@ 
 w  t 
Iorg.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionsr 


Re: How to get flink JobId in runtime

2020-07-20 Thread Congxian Qiu
Hi Sili

I'm not sure if there are other ways to get this value properly. Maybe
you can try
`RuntimeContext.getMetricGroup().getAllVariables().get("")`.

Best,
Congxian


Si-li Liu  于2020年7月20日周一 下午7:38写道:

> Hi
>
> I want to retrieve flink JobId in runtime, for example, during
> RichFunction's open method. Is there anyway to do it?
>
> I checked the methods in RuntimeContext and ExecutionConfig, seems I can't
> get this information from them.
>
> Thanks!
>
> --
> Best regards
>
> Sili Liu
>


回复: (无主题)

2020-07-20 Thread 罗显宴
好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,

CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,

这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:



大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制



Re: 官方pyflink 例子的执行问题

2020-07-20 Thread Xingbo Huang
Hi,
你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)就行,如果你用了的话,就需要配置off-heap
memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')。你可以参考文档上的例子,以及对应的note说明[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions

Best,
Xingbo


chenxuying  于2020年7月21日周二 上午11:36写道:

> 官方例子:
> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
> 按照例子写了程序,也安装了pyflink
> |
> python -m pip install apache-flink
> |
> 代码:
> |
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
>
>
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
> DataTypes.BIGINT())
>
>
> t_env.register_function("add", add)
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
> \
> .with_format(OldCsv()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .create_temporary_table('mySource')
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
> \
> .with_format(OldCsv()
> .field('sum', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('sum', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
>
> t_env.from_path('mySource')\
> .select("add(a, b)") \
> .insert_into('mySink')
>
>
> t_env.execute("tutorial_job")
> |
>
> 执行:
>
> |
> python test_pyflink.py
> |
>
> 报错:
>
>
> |
> Traceback (most recent call last):
>   File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'.
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at 

Re: How does TaskManager announce JobManager about available ResultPartitions?

2020-07-20 Thread Zhu Zhu
Hi Joseph,
The availability of pipelined result partition is notified to JM
via scheduleOrUpdateConsumers RPC.

Just want to mention that it's better to send such questions to the user
mail list.

Thanks,
Zhu Zhu

Fork Joseph  于2020年7月21日周二 上午3:30写道:

> Hi,
>
> According to description in
>
> https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
> ,
> TaskManager announces (tells) JobManager about available
> ResultSubpartitions  (RSs) to let another TM to initiate the transfer.
> However, I can’t find where in the codebase TM actually announces JM about
> RSs for streaming (Pipelined) mode.
>
> Thanks!
> Joseph
>


Flink ??kafka??????????????????checkpoint??????????

2020-07-20 Thread ??????
??
??Flink 
??kafka??checkpointEXACTLY_ONCE
??
Producer attempted an operation with an old epoch.Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker


??
??

Re: (无主题)

2020-07-20 Thread shizk233
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)


add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
DataTypes.BIGINT())


t_env.register_function("add", add)


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
 \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
 \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')


t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')


t_env.execute("tutorial_job")
|

执行:

|
python test_pyflink.py
|

报错:


|
Traceback (most recent call last):
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap 
Memory 0 bytes is less than the least required Python worker Memory 79 mb. The 
Task Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 

Re: Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

2020-07-20 Thread Jingsong Li
默认创建的是Flink表,Hive端不可见。
你想创建Hive表的话,用Hive dialect。

Best,
Jingsong

On Tue, Jul 21, 2020 at 11:31 AM felixzh  wrote:

> 参照文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
> 通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
> 在beeline端可见表,但是desc看不到字段,select * from orders也不可用
>
>

-- 
Best, Jingsong Lee


Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

2020-07-20 Thread felixzh
参照文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
在beeline端可见表,但是desc看不到字段,select * from orders也不可用



回复: (无主题)

2020-07-20 Thread 罗显宴
hi,
CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


?????? pyflink1.11.0window

2020-07-20 Thread ??????????????
??
  
??pyflinkdemo??





----
??: 
   "user-zh"



Re: (无主题)

2020-07-20 Thread shizk233
Hi,

我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。

你可以让acc做个累加,然后结果输出里把acc的值带上看看。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:

>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
> 不好意思,刚才发的快,没来得及解释,
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


Re: Flink Cli 部署问题

2020-07-20 Thread Congxian Qiu
Hi  Z-Z
这种情况比较奇怪的。你这个是稳定复现的吗?能否分享一个稳定复现的作业代码,以及相关步骤呢?我尝试本地复现一下


Best,
Congxian


Z-Z  于2020年7月20日周一 下午4:17写道:

> 通过 cli 命令是 在jobmanager目录 执行 bin/flink run -d -p 1 -s {savepointuri}
> /data/test.jar 这种会报莫名其妙的错误,如之前的邮件
> 通过webui就是在http://jobmanager:8081; submit new
> job里添加jar包,指定相同的savepoint path和并行度提交任务  这样操作就没问题
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com;
> 发送时间:2020年7月20日(星期一) 下午2:30
> 收件人:"user-zh"
> 主题:Re: Flink Cli 部署问题
>
>
>
> Hi
>
> 这个调试可以在 IDEA 进行的。
>
> 另外你说的通过 web ui 提交没有问题。请问下,是同一个 savepoint 通过 flink run 提交有问题,通过 web ui
> 提交没有问题吗?如果是的,能否分享下你的操作过程和命令呢?
>
> Best,
> Congxian
>
>
> Z-Z 
>  这是taskmanager新报的一个错,还是跟之前一样,用cli提交报错,用webui提交就没问题:
>  2020-07-20 03:29:25,959 WARNnbsp;
>  org.apache.kafka.clients.consumer.ConsumerConfignbsp; nbsp;
> nbsp; nbsp;
>  nbsp; nbsp; nbsp; - The configuration
> 'value.serializer' was supplied
>  but isn't a known config.
>  2020-07-20 03:29:25,959 INFOnbsp;
>  org.apache.kafka.common.utils.AppInfoParsernbsp; nbsp;
> nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;-
> Kafka version : 0.11.0.2
>  2020-07-20 03:29:25,959 INFOnbsp;
>  org.apache.kafka.common.utils.AppInfoParsernbsp; nbsp;
> nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;-
> Kafka commitId : 73be1e1168f91ee2
>  2020-07-20 03:29:25,974 ERROR
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuildernbsp;
>  - Caught unexpected exception.
>  java.lang.ArrayIndexOutOfBoundsException: 0
>  at
> 
> org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
>  at
> 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
>  at
> 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
>  at
> 
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>  at
> 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  at
> 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>  at
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at
> java.lang.Thread.run(Thread.java:748)
>  2020-07-20 03:29:25,974 WARNnbsp;
> 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedurenbsp;
> -
>  Exception while restoring keyed state backend for
>  StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative
> (1/1),
>  will retry while more alternatives are available.
>  org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected
>  exception.
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>  at
> 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>  at
> 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>  at
> 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  at
> 
> 

Re: liststate

2020-07-20 Thread Congxian Qiu
Hi

ListState 中的 value 是一个整体,所以一次性会取回来,现在 RocksDBMapStat 中有一些操作
(iterator/entries/values 等)是使用 next 方法加载的。

Best,
Congxian


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年7月21日周二 上午9:28写道:

> 大家好:
>  
> 我发现RocksDBListState#get方法是一次性把数据全加载到内存中,如果list太大这样会不会造成内存溢出。可不可以在next方法中才加载数据?


Re: flink1.11 run

2020-07-20 Thread Rui Li
可以写已有的表,相关的配置 [1] 需要添加到表的property当中。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing

On Mon, Jul 20, 2020 at 5:14 PM Dream-底限  wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li  于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > environment.setStateBackend(new MemoryStateBackend());
> > > environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > > String name = "myhive";
> > > String defaultDatabase = "tmp";
> > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > String version = "1.1.0";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > tableEnv.registerCatalog("myhive", hive);
> > > tableEnv.useCatalog("myhive");
> > >
> > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > "  user_id BIGINT,\n" +
> > > "  item_id STRING,\n" +
> > > "  behavior STRING,\n" +
> > > "  ts AS PROCTIME()\n" +
> > > ") WITH (\n" +
> > > " 'connector' = 'kafka-0.11',\n" +
> > > " 'topic' = 'user_behavior',\n" +
> > > " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > > " 'properties.group.id' = 'testGroup',\n" +
> > > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > " 'format' = 'json',\n" +
> > > " 'json.fail-on-missing-field' = 'false',\n" +
> > > " 'json.ignore-parse-errors' = 'true'\n" +
> > > ")");
> > >
> > > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Leonard Xu
Hi, kelly

Looks like you want to use fact table(from Kafka) to join a dimension 
table(From filesystem),  dimension table is one kind of Temporal Table, 
temporal table join syntax you could refer Danny's post[1].

But `FileSystemTableSource` did not implement `LookupTableSource` interface yet 
which means you can not use it as a dimension table, the connector that 
supported `LookupTableSource` includes JDBC、HBase、Hive,
you can created an issue to support `lookupTableSource` for filesystem 
connector.

Another approach is using Temporal Table Function[1] which can define a 
Temporal table from a dataStream, you can convert your Table(filesystem table) 
to stream and then create a temporal table and then join the temporal table.


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#defining-temporal-table-function
 



> 在 2020年7月21日,10:07,Danny Chan  写道:
> 
> Seems you want a temporal table join instead of a two stream join, if that is 
> your request, you should use syntax
> 
> Join LookupTable FOR SYSTEM_TIME AS OF …
> 
> See [1] for details.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>  
> 
> 
> Best,
> Danny Chan
> 在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
>> Hi folks,
>> 
>>  
>> I have a question Flink SQL. What I want to do is this:
>> 
>>  
>> Join a simple lookup table (a few rows) to a stream of data to enrich the 
>> stream by adding a column from the lookup table.
>>  
>>  
>> For example, a simple lookup table:
>> 
>>  
>> CREATE TABLE LookupTable (
>> `computeClass`  STRING,
>> `multiplier`FLOAT
>> ) WITH (
>> 'connector' = 'filesystem',
>> 'path' = 'fpu-multipliers.csv',
>> 'format' = 'csv'
>> )
>> 
>>  
>>  
>> And I’ve got a Kafka connector table with rowtime semantics that has a 
>> `computeClass` field. I simply want to join (in a streaming fashion) the 
>> `multiplier` field above.
>> 
>>  
>> SELECT
>> `timestamp`,
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> FROM KafkaStream ks
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>  
>> Doing a simple join like that gives me this error:
>> 
>>  
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not be 
>> in the input rows of a regular join. As a workaround you can cast the time 
>> attributes of input tables to TIMESTAMP before.”
>> 
>>  
>> Which leads me to believe that I should use an Interval Join instead, but 
>> that doesn’t seem to be appropriate since my table is static and has no 
>> concept of time. Basically, I want to hold the entire lookup table in 
>> memory, and simply enrich the Kafka stream (which need not be held in 
>> memory).
>> 
>>  
>> Any ideas on how to accomplish what I’m trying to do?
>> 
>>  
>> Thanks!
>> 
>> Kelly
>> 



Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Danny Chan
Hi, Dongwon ~

> Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error at 
> line 1, column 96.  Encountered

The error did report the position, you can take a reference to see which syntax 
context caused the problem.

Best,
Danny Chan
在 2020年7月20日 +0800 PM11:10,Dongwon Kim ,写道:
> Hi Leonard,
>
> You're right; I was missing a single quotation mark before the LIKE.
>
> There's no encoding problem at all!
> Sorry for the confusion.
>
> Thanks,
>
> Dongwon
>
>
> > On Tue, Jul 21, 2020 at 12:00 AM Leonard Xu  wrote:
> > > Hi, Kim
> > >
> > > The clause  ` LIKE '%양현마을%’ ` should work well, could you post the the 
> > > entire query(or select clause) ?
> > >
> > > Best
> > > Leonard Xu
> > >
> > > > 在 2020年7月20日,21:49,Dongwon Kim  写道:
> > > >
> > > > When I execute the following query in .sqlQuery(),
> > > > > SELECT ...
> > > > > FROM ...
> > > > > WHERE location.goalName LIKE '%양현마을%'
> > >


????: ?????? pyflink1.11.0window

2020-07-20 Thread chengyanan1...@foxmail.com
Hi??Sinkinsertinsert 
??
??AppendStreamTableSink requires that Table has only insert changes.??



chengyanan1...@foxmail.com
 
 ??
?? 2020-07-20 16:23
 user-zh
?? ?? pyflink1.11.0window
HI ??
  
??kafka
Traceback (most recent call last):
 File 
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, 
in deco
  return f(*a, **kw)
 File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
328, in get_return_value
  format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
: org.apache.flink.table.api.TableException: AppendStreamTableSink requires 
that Table has only insert changes.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
 
 
 
 
During handling of the above exception, another exception occurred:
 
 
Traceback (most recent call last):
 File "tou.py", line 99, in 

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Danny Chan
Seems you want a temporal table join instead of a two stream join, if that is 
your request, you should use syntax

Join LookupTable FOR SYSTEM_TIME AS OF …

See [1] for details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Danny Chan
在 2020年7月21日 +0800 AM6:32,Kelly Smith ,写道:
> Hi folks,
>
> I have a question Flink SQL. What I want to do is this:
>
>
> • Join a simple lookup table (a few rows) to a stream of data to enrich the 
> stream by adding a column from the lookup table.
>
>
>
> For example, a simple lookup table:
>
> CREATE TABLE LookupTable (
>     `computeClass`  STRING,
>     `multiplier`    FLOAT
> ) WITH (
>     'connector' = 'filesystem',
>     'path' = 'fpu-multipliers.csv',
>     'format' = 'csv'
> )
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a 
> `computeClass` field. I simply want to join (in a streaming fashion) the 
> `multiplier` field above.
>
> SELECT
>`timestamp`,
>// ...
>ks.computeClass,
>lt.`multiplier`
> FROM KafkaStream ks
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
> Doing a simple join like that gives me this error:
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
> the input rows of a regular join. As a workaround you can cast the time 
> attributes of input tables to TIMESTAMP before.”
>
> Which leads me to believe that I should use an Interval Join instead, but 
> that doesn’t seem to be appropriate since my table is static and has no 
> concept of time. Basically, I want to hold the entire lookup table in memory, 
> and simply enrich the Kafka stream (which need not be held in memory).
>
> Any ideas on how to accomplish what I’m trying to do?
>
> Thanks!
> Kelly


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651

godfrey he  于2020年7月21日周二 上午9:46写道:

> hi  Kelly,
> As the exception message mentioned: currently, we must cast the time
> attribute to regular TIMESTAMP type,
> then we can do regular join. Because time attribute will be out-of-order
> after regular join,
> and then we can't do window aggregate based on the time attribute.
>
> We can improve it that the planner implicitly casts the time attribute to
> regular TIMESTAMP type,
> and throws exception there is an operator (after join) depended on time
> attribute, like window aggregate.
>
> I will create a JIRA to trace this.
>
> Best,
> Godfrey
>
> Kelly Smith  于2020年7月21日周二 上午6:38写道:
>
>> Hi folks,
>>
>>
>>
>> I have a question Flink SQL. What I want to do is this:
>>
>>
>>
>>- Join a simple lookup table (a few rows) to a stream of data to
>>enrich the stream by adding a column from the lookup table.
>>
>>
>>
>>
>>
>> For example, a simple lookup table:
>>
>>
>>
>> *CREATE TABLE *LookupTable (
>> *`computeClass`  *STRING,
>> *`multiplier`*
>> *FLOAT *) *WITH *(
>> *'connector' *= *'filesystem'*,
>> *'path' *= *'fpu-multipliers.csv'*,
>> *'format' *=
>> *'csv' *)
>>
>>
>>
>>
>>
>> And I’ve got a Kafka connector table with rowtime semantics that has a
>> `computeClass` field. I simply want to join (in a streaming fashion) the
>> `multiplier` field above.
>>
>>
>>
>>
>> *SELECT*`timestamp`,
>>
>> // ...
>> ks.computeClass,
>> lt.`multiplier`
>> *FROM *KafkaStream ks
>>
>> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>>
>>
>>
>> Doing a simple join like that gives me this error:
>>
>>
>>
>> “org.apache.flink.table.api.TableException: Rowtime attributes must not
>> be in the input rows of a regular join. As a workaround you can cast the
>> time attributes of input tables to TIMESTAMP before.”
>>
>>
>>
>> Which leads me to believe that I should use an Interval Join instead, but
>> that doesn’t seem to be appropriate since my table is static and has no
>> concept of time. Basically, I want to hold the entire lookup table in
>> memory, and simply enrich the Kafka stream (which need not be held in
>> memory).
>>
>>
>>
>> Any ideas on how to accomplish what I’m trying to do?
>>
>>
>>
>> Thanks!
>>
>> Kelly
>>
>


Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi  Kelly,
As the exception message mentioned: currently, we must cast the time
attribute to regular TIMESTAMP type,
then we can do regular join. Because time attribute will be out-of-order
after regular join,
and then we can't do window aggregate based on the time attribute.

We can improve it that the planner implicitly casts the time attribute to
regular TIMESTAMP type,
and throws exception there is an operator (after join) depended on time
attribute, like window aggregate.

I will create a JIRA to trace this.

Best,
Godfrey

Kelly Smith  于2020年7月21日周二 上午6:38写道:

> Hi folks,
>
>
>
> I have a question Flink SQL. What I want to do is this:
>
>
>
>- Join a simple lookup table (a few rows) to a stream of data to
>enrich the stream by adding a column from the lookup table.
>
>
>
>
>
> For example, a simple lookup table:
>
>
>
> *CREATE TABLE *LookupTable (
> *`computeClass`  *STRING,
> *`multiplier`*
> *FLOAT *) *WITH *(
> *'connector' *= *'filesystem'*,
> *'path' *= *'fpu-multipliers.csv'*,
> *'format' *=
> *'csv' *)
>
>
>
>
>
> And I’ve got a Kafka connector table with rowtime semantics that has a
> `computeClass` field. I simply want to join (in a streaming fashion) the
> `multiplier` field above.
>
>
>
>
> *SELECT*`timestamp`,
>
> // ...
> ks.computeClass,
> lt.`multiplier`
> *FROM *KafkaStream ks
>
> JOIN LookupTable lt ON ks.computeClass = lt.computeClass
>
>
>
> Doing a simple join like that gives me this error:
>
>
>
> “org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.”
>
>
>
> Which leads me to believe that I should use an Interval Join instead, but
> that doesn’t seem to be appropriate since my table is static and has no
> concept of time. Basically, I want to hold the entire lookup table in
> memory, and simply enrich the Kafka stream (which need not be held in
> memory).
>
>
>
> Any ideas on how to accomplish what I’m trying to do?
>
>
>
> Thanks!
>
> Kelly
>


?????? flink1.11????????

2020-07-20 Thread Evan
Hi,??jar??1??45jar




----
??: 
   "user-zh"



liststate

2020-07-20 Thread ??????(Jiacheng Jiang)

  
??RocksDBListState#get??list??next??

Flink SQL - Join Lookup Table

2020-07-20 Thread Kelly Smith
Hi folks,

I have a question Flink SQL. What I want to do is this:


  *   Join a simple lookup table (a few rows) to a stream of data to enrich the 
stream by adding a column from the lookup table.


For example, a simple lookup table:

CREATE TABLE LookupTable (
`computeClass`  STRING,
`multiplier`FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'fpu-multipliers.csv',
'format' = 'csv'
)


And I’ve got a Kafka connector table with rowtime semantics that has a 
`computeClass` field. I simply want to join (in a streaming fashion) the 
`multiplier` field above.


SELECT
`timestamp`,

// ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks

JOIN LookupTable lt ON ks.computeClass = lt.computeClass


Doing a simple join like that gives me this error:

“org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.”

Which leads me to believe that I should use an Interval Join instead, but that 
doesn’t seem to be appropriate since my table is static and has no concept of 
time. Basically, I want to hold the entire lookup table in memory, and simply 
enrich the Kafka stream (which need not be held in memory).

Any ideas on how to accomplish what I’m trying to do?

Thanks!
Kelly


Re: records-lag-max

2020-07-20 Thread Chen, Mason
I removed an unnecessary `.keyBy()` and I’m getting the metrics again. Is this 
a potential bug?

From: "Chen, Mason" 
Date: Monday, July 20, 2020 at 12:41 PM
To: "user@flink.apache.org" 
Subject: records-lag-max

Hi all,

I am having some trouble with the lag metric from the kafka connector. The 
gauge value is always reported as NaN although I’m producing events first and 
then starting the flink job. Anyone know how to fix this?

```
# TYPE flink_taskmanager_job_task_operator_records_lag_max gauge
flink_taskmanager_job_task_operator_records_lag_max{ … } NaN
```

Thanks,
Mason


Re: Are files in savepoint still needed after restoring if turning on incremental checkpointing

2020-07-20 Thread Lu Niu
Thanks Yun! That's what I thought :)

Best
Lu

On Sun, Jul 19, 2020 at 7:57 PM Yun Tang  wrote:

> Hi Lu
>
> Once a new checkpoint is completed when restoring from a savepoint, the
> previous savepoint would be useless if you decide to restore from new
> checkpoint.
> In other words, new incremental checkpoint has no relationship with older
> savepoint from which restored.
>
> Best
> Yun Tang
> --
> *From:* Lu Niu 
> *Sent:* Saturday, July 18, 2020 5:48
> *To:* user 
> *Subject:* Are files in savepoint still needed after restoring if turning
> on incremental checkpointing
>
> Hi, Flink Users
>
> Assuming one flink job turns incremental checkpointing and restores from a
> savepoint. It runs fine for a while and commits one checkpoint and then it
> fully restarts because of one error. At this time, is it possible that the
> job still needs files in the original savepoint for recovery? If so, how to
> determine whether a savepoint is safe to delete?
>
> Best
> Lu
>


records-lag-max

2020-07-20 Thread Chen, Mason
Hi all,

I am having some trouble with the lag metric from the kafka connector. The 
gauge value is always reported as NaN although I’m producing events first and 
then starting the flink job. Anyone know how to fix this?

```
# TYPE flink_taskmanager_job_task_operator_records_lag_max gauge
flink_taskmanager_job_task_operator_records_lag_max{ … } NaN
```

Thanks,
Mason


Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
What are you trying to do in the ReduceFunction? Without knowing the 
code, maybe an aggregate(AggregateFunction) is the solution.


Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:

Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
wrote:


Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:

https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300

Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but

Flink

says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio









Re: Accumulators in Table API

2020-07-20 Thread Dawid Wysakowicz
Hi Flavio.

You don't have access to accumulators in Table API.

A few other ways that come to my mind are:

1. Use existing metrics e.g. operator input/output records.

2. Use metrics in a UDF

3. Have a regular count (you can have multiple queries optimized into a
single graph via TableEnvironment#createStatementSet)

Best,

Dawid

On 16/07/2020 09:36, Flavio Pompermaier wrote:
> Hi to all,
> in my legacy code (using Dataset api) I used to add a map function
> just after the Source read and keep the count of the rows. In this way
> I had a very light and unobtrusive way of counting the rows of a
> dataset. Can I do something similar in table API? Is there a way to
> use accumulators?
>
> Thanks in advance,
> Flavio



signature.asc
Description: OpenPGP digital signature


Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Flavio Pompermaier
Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
wrote:

> Hi Flavio,
>
> the reason is that under the covers the ReduceFunction will be used as
> the ReduceFunction of a ReducingState. And those cannot be rich
> functions because we cannot provide all the required context "inside"
> the state backend.
>
> You can see how the ReduceFunction is used to create a
> ReducingStateDescriptor here:
>
> https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300
>
> Best,
> Aljoscha
>
> On 16.07.20 16:28, Flavio Pompermaier wrote:
> > Hi to all,
> > I'm trying to apply a rich reduce function after a countWindowAll but
> Flink
> > says
> > "ReduceFunction of reduce can not be a RichFunction. Please use
> > reduce(ReduceFunction, WindowFunction) instead."
> >
> > Is there any good reason for this? Or am I doing something wrong?
> >
> > Best,
> > Flavio
> >
>


Re: Global Hashmap & global static variable.

2020-07-20 Thread Piotr Nowojski
Hi Annemarie,

You are missing some basic concepts in Flink, please take a look at [1].

> Weirdly enough it worked fine in my Intellij.

It's completely normal. If you are accessing some static variable in your
code and you are executing your Flink application in a testing local
environment (Intellij), where there is just a single JVM running all of the
code, so everyone (any code/operator/function) accessing the same static
variable will be accessing the same one. But if you execute the same code
on a cluster, with multiple machines (Flink is a distributed system!),
there will be no way for different JVM processes to communicate via static
variables.

Probably the same problem applies to your first issue with the HashMap.

Please, carefully follow [1], how Flink executes/distributes your code to
understand this problem.

Can you describe what are you trying to do/achieve/solve?

There is currently no way for different operators/functions to share access
to the same state. Note, different parallel instances of the same
operator/function can share the same state. It's called broadcast state
[2], but it doesn't allow for the pattern you are looking for (aggregate
results in one stage, and then use this aggregated state in another later
stage/operator). To do this, you would have to store the state in some
external system (some external key/valua storage, DB, Kafka, File on a
distribute file system, ...), to make it visible and accessible across your
whole cluster.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/flink-architecture.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

pt., 17 lip 2020 o 21:32 Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> napisał(a):

> Hi,
>
> I have two questions:
>
> 1. In the first part of my pipeline using Flink DataStreams processing
> graph
> edges, I'm filling up Hashmap. In it goes a vertex id and the partition
> this
> vertex is assigned to. Later in my pipeline I want to query this Hashmap
> again, to see in which partition exactly I can find a specific edge based
> on
> which partitions its two vertices are assigned to. What is the best way to
> do this? I keep getting Nullpointer exceptions.
>
> 2. Is there a good way to retrieve a single value at some point in my
> pipeline, and then make it globally available? I was using static, but
> found
> that this led to a Nullpointer exception when executing Flink standalone.
> Weirdly enough it worked fine in my Intellij.
>
> All help very appreciated!
>
> Best,
> Annemarie
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Is there a way to use stream API with this program?

2020-07-20 Thread Piotr Nowojski
Hi,

I'm afraid that there is not out of the box way of doing this. I've created
a ticket [1] to write down and document a discussion that we had about this
issue in the past.

The issue is that currently, untriggered processing time timers are ignored
on end of input and it seems like there might be no one single perfect way
to handle it for all of the cases, but it probably needs to be customized.

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement
`org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
`MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke
`internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18647

pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
napisał(a):

> Hi to all,
> I was trying to port another job we have that use dataset API to
> datastream.
> The legacy program was doing basically a dataset.mapPartition().reduce()
> so I tried to replicate this thing with a
>
>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>   final DataStream input = env.fromElements(//
> Row.of(1.0), //
> Row.of(2.0), //
> Row.of(3.0), //
> Row.of(5.0), //
> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>  inputStream.map(new SubtaskIndexAssigner(columnType))
> .keyBy(t -> t.f0)
> .window(GlobalWindows.create())
>
> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
> 100L))).
> .process(..)
>
> Unfortunately the program exits before reaching the Process function
> (moreover I need to add another window + trigger after it before adding the
> reduce function).
> Is there a way to do this with the DataStream API or should I still use
> DataSet API for the moment (when the batch will be fully supported)? I
> append to the footer all the code required to test the job.
>
> Best,
> Flavio
>
> -
>
> package org.apache.flink.stats.sketches;
>
> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ReducingState;
> import org.apache.flink.api.common.state.ReducingStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeutils.base.LongSerializer;
> import org.apache.flink.api.java.io.PrintingOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
>
> public class Test {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
>
> final BasicTypeInfo columnType =
> BasicTypeInfo.DOUBLE_TYPE_INFO;
> final DataStream input = env.fromElements(//
> Row.of(1.0), //
> Row.of(2.0), //
> Row.of(3.0), //
> Row.of(5.0), //
> Row.of(6.0)).returns(new RowTypeInfo(columnType));
> final DataStream out = input.map(new
> SubtaskIndexAssigner(columnType))//
> .keyBy(t -> t.f0)//
> .window(GlobalWindows.create())
> .trigger(PurgingTrigger.of(new
> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
> .process(new ProcessWindowFunction, Row,
> Integer, GlobalWindow>() {
>
>   @Override
>   public void process(Integer key,
>   ProcessWindowFunction, Row, Integer,
> GlobalWindow>.Context context,
>   Iterable> it, Collector out)
> throws Exception {
> for (Tuple2 tuple : it) {
>   out.collect(Row.of(tuple.f1.getField(0).toString()));
>   

Re: Status of a job when a kafka source dies

2020-07-20 Thread Aljoscha Krettek

Hi,

Flink doesn't do any special failure-handling or retry logic, so it’s up 
to how the KafkaConsumer is configured via properties. In general Flink 
doesn’t try to be smart: when something fails an exception fill bubble 
up that will fail this execution of the job. If checkpoints are enabled 
this will trigger a restore, this is controlled by the restart strategy. 
If that eventually gives up the job fill go to “FAILED” and stop.


This is the relevant section of the docs: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html


Best,
Aljoscha

On 15.07.20 17:42, Nick Bendtner wrote:

Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.





Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
Hi Leonard,

You're right; I was missing a single quotation mark before the LIKE.

There's no encoding problem at all!
Sorry for the confusion.

Thanks,

Dongwon


On Tue, Jul 21, 2020 at 12:00 AM Leonard Xu  wrote:

> Hi, Kim
>
> The clause  ` LIKE '%양현마을%’ ` should work well, could you post the the
> entire query(or select clause) ?
>
> Best
> Leonard Xu
>
> 在 2020年7月20日,21:49,Dongwon Kim  写道:
>
> When I execute the following query in .sqlQuery(),
>
>> SELECT ...
>> FROM ...
>> WHERE location.goalName LIKE '%양현마을%'
>
>
>


Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Leonard Xu
Hi, Kim

The clause  ` LIKE '%양현마을%’ ` should work well, could you post the the entire 
query(or select clause) ?

Best
Leonard Xu

> 在 2020年7月20日,21:49,Dongwon Kim  写道:
> 
> When I execute the following query in .sqlQuery(),
> SELECT ...
> FROM ...
> WHERE location.goalName LIKE '%양현마을%'



Re: Beam flink runner job not keeping up with input rate after downscaling

2020-07-20 Thread Piotr Nowojski
Hi,

maxParallelism = -1, the default value, is interpreted as described in the
documentation you linked:

> The default setting for the maximum parallelism is roughly
operatorParallelism + (operatorParallelism / 2) with a lower bound of 128
and an upper bound of 32768.

So maxParallelism should be 128 in your case, if you haven't changed this
value. But aren't you confusing maxParallelism with parallelism? It doesn't
seem to have anything to do with the problem you described:

> With initial parallelism of 50, our application is able to process up to
50,000 records per second. After a week, we took a savepoint and restarted
from savepoint with the parallelism of 18. We are seeing that our
application is only able to process 7000 records per second.

Doesn't this answer your question? Initially you were running with
parallelism 50 and you achieved 50k r/s. After decreasing the parallelism
and scaling down the cluster the throughput went down to 7k r/s. It makes
sense to me.

Piotrek


czw., 16 lip 2020 o 22:08 Kathula, Sandeep 
napisał(a):

> Hi,
>
>We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateful application* which uses *RocksDB* as state store. We
> are using *timers* and Beam’s *value state and bag state (which is same
> as List state of Flink).* We are doing *incremental* *checkpointing*.
> With initial parallelism of 50, our application is able to process up to 
> *50,000
> records* per second. After a week, we took a savepoint and restarted from
> savepoint with the parallelism of *18.* We are seeing that our
> application is only able to process *7000* records per second. Records
> processed per task manager was almost *half* of what is used to process
> previously with 50 task managers.
>
>
>
> We didn’t give any maxParallelism in our Beam application but found from
> logs that maxParallelism has been set to -1. Also Beam’s doc for Flink
> runner mentiones by default maxParallelism is -1
> https://beam.apache.org/documentation/runners/flink/
>
>
>
> But this Flink doc
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
> mentions
> that by default maxParallelism is set to operatorParallelism +
> (operatorParallelism / 2) which would be 75 in our case.
>
>
>
> I didn’t get how maxParallelism is set (when giving maxParallelism as -1
> to Beam’s Flink runner). I highly doubt *more key groups is causing this
> performance degradation*?
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>


Re: Flink Jobs are failing for running testcases when trying to build in Jenkins server

2020-07-20 Thread Piotr Nowojski
Hi,

It looks like Flink mini (test) cluster has troubles starting up on your
Jenkins machine. Frankly, it's hard for me to guess what could be the issue
here.

1. Are you following this guideline? [1]
2. Maybe there are some other error messages somewhere else in the
logs/stderr/stdout? This 100s timeout is probably not the root cause of the
problem.
3. Have you tried logging in on the Jenkins worker and trying to reproduce
the problem there? And bisecting it by simplifying everything as much as
possible until the problems go away?

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource

czw., 16 lip 2020 o 17:41 bujjirahul45  napisał(a):

> Hi,
>
> I am trying to build flink job in Jenkins server and when its running the
> testcases its giving me below i am doing a simple pattern validation, where
> i am testing data against a set of patterns its build fine in local with
> gradle 6.3 but trying to build in Jenkins server its giving below is stack
> trace please suggest me what i am doing wrong
>
> SignalPatternDefinitionMatchingTest >
> testCompareInputAndOutputDataForInValidSignal() FAILED
> java.lang.Exception: Could not create actor system
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:278)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:164)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRpcService(AkkaRpcServiceUtils.java:126)
> at
> org.apache.flink.runtime.metrics.util.MetricUtils.startMetricsRpcService(MetricUtils.java:139)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:286)
> at
> org.apache.flink.client.deployment.executors.LocalExecutor.startMiniCluster(LocalExecutor.java:117)
> at
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:63)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> com.myorg.pattern.service.TestPatternProcessingService.getInValidSignalDataStreamOutput(TestPatternProcessingService.java:140)
> at
> com.myorg.pattern.pattern.SignalPatternDefinitionMatchingTest.testCompareInputAndOutputDataForInValidSignal(SignalPatternDefinitionMatchingTest.java:24)
> at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
> at
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> at
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> at
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
> at
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
> at
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>  

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek

Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as 
the ReduceFunction of a ReducingState. And those cannot be rich 
functions because we cannot provide all the required context "inside" 
the state backend.


You can see how the ReduceFunction is used to create a 
ReducingStateDescriptor here: 
https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300


Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but Flink
says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio





Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
Hi,

When I execute the following query in .sqlQuery(),

> SELECT ...
> FROM ...
> WHERE location.goalName LIKE '%양현마을%'
>
I got the following error message

> Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error
> at line 1, column 96.  Encountered: "\uc591" (50577), after : ""
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 3 more
> Caused by: org.apache.flink.sql.parser.impl.TokenMgrError: Lexical error
> at line 1, column 96.  Encountered: "\uc591" (50577), after : ""
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager.getNextToken(FlinkSqlParserImplTokenManager.java:16104)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_scan_token(FlinkSqlParserImpl.java:35909)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3R_430(FlinkSqlParserImpl.java:34086)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3R_349(FlinkSqlParserImpl.java:34072)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3R_206(FlinkSqlParserImpl.java:35321)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3R_111(FlinkSqlParserImpl.java:35344)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3_69(FlinkSqlParserImpl.java:34666)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3_70(FlinkSqlParserImpl.java:35107)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_3_71(FlinkSqlParserImpl.java:35188)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_2_71(FlinkSqlParserImpl.java:29567)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16828)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:16992)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16758)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.WhereOpt(FlinkSqlParserImpl.java:12927)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6914)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16741)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16204)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3761)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> ... 5 more


Can anyone help me with this problem?

Thanks,

Dongwon


Simple MDC logs don't show up

2020-07-20 Thread Manish G
Hi All,

I have some very simple MDC logs in my flink job:

MDC.put("methodName", new Object()
{}.getClass().getEnclosingMethod().getName());
MDC.put("className", this.getClass().getSimpleName());

When I run flink job locally, I can see them in the application logs.

But when I run the same job on kubernetes clutter, these don't show up.

Any input here?

With regards


Re: flink1.11启动问题

2020-07-20 Thread Leonard Xu
Hi,

> 在 2020年7月20日,20:15,酷酷的浑蛋  写道:
> 
> 1. flink1.11解压后,启动会报:
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

第一步就报错,应该是你本地环境问题,后面加的包应该都是不需要的,你本地用的JDK版本是多少呀?

祝好
Leonard Xu

回复: (无主题)

2020-07-20 Thread 罗显宴


大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制
package com.bupt.main

import java.sql.Timestamp
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.{AggregateFunction, 
FlatMapFunction, MapFunction, RuntimeContext}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests



case class Drug(id:String,ATCCode:String,MIMSType:String,
name:String,other:String, producer:String,
retailPrice:String,composition:String,
medicineRank:String, timestamp:Long)

case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long)

case class ItemViewCount(category:String,windowEnd:Long)

case class IncreasePerHour(time:String,num:Long)

object KafkaToElasticSearch {
  def main(args: Array[String]): Unit = {
// 1. 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties
props.put("bootstrap.servers", "master:9092")
props.put("zookeeper.connect", "master:2181")
props.put("group.id", "drug-group")
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")

val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", 
//这个 kafka topic 需要和上面的工具类的 topic 一致
  new SimpleStringSchema, props)).setParallelism(1)
  .filter(string =>{
val words = string.split(",")
words.length == 10 && words(2).length!=0
  })
  .map(new MapFunction[String,Drug] {
  override def map(value: String): Drug = {
  val words = value.split(",")
if(words.length!=10)
println(words)
  
Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong)
  }
} ).assignAscendingTimestamps( _.timestamp )
//drugs.print()
val num = drugs.map(drug => {
  var temp: StringBuilder = new StringBuilder(drug.MIMSType)

  if (temp.length != 0 && temp.charAt(0) == '-')
temp.deleteCharAt(0)
  if (temp.length != 0 && temp.charAt(temp.length - 1) == ';')
temp.deleteCharAt(temp.length - 1)
  var validateResult: String = null
  if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' 
'))
  IncreaseNumPerHour(validateResult, 1l, drug.timestamp)
})

//num.print()
//
val result = num.keyBy(_.category)
  .timeWindow(Time.hours(1))
  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
  .aggregate(new CountAgg,new WindowResult)
  .keyBy(_.windowEnd)
  .process(new IncreaseItems)
result.print()

env.execute("compute num")
  }

}

// 自定义预聚合函数

//in、acc、out
//在这里面,累加器相当于是state
class CountAgg() extends 

Re: pyflink1.11.0window

2020-07-20 Thread Shuiqiang Chen
看看异常信息, 是不是你的insert mode没配置对。
BTW, 你粘贴的文本带有很多"", 有点影响可读性。

Best,
Shuiqiang

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道:

> HI :
>   我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下
> Traceback (most recent call last):
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 147, in deco
>   return f(*a, **kw)
>  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>   format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
> : org.apache.flink.table.api.TableException: AppendStreamTableSink
> requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> During handling of the above exception, another exception occurred:
>
>
> Traceback (most recent call last):
>  File "tou.py", line 99, infrom_kafka_to_kafka_demo()
>  File "tou.py", line 33, in from_kafka_to_kafka_demo
>   st_env.sql_update("insert into flink_result select
> id,type,rowtime from final_result2")
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
> line 547, in sql_update
>   self._j_tenv.sqlUpdate(stmt)
>  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>   answer, self.gateway_client, self.target_id, self.name)
>  File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line
> 154, in deco
>   raise exception_mapping[exception](s.split(': ', 1)[1],
> stack_trace)
> pyflink.util.exceptions.TableException: 'AppendStreamTableSink requires
> that Table has only insert changes.'
>
>
>
>
>
> 这种应该如何实现,需求大概是一个流表(需要分组汇总)关联一个维表。
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, DataTypes,
> EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
> from pyflink.table.window import Tumble
>
>
>
>
> def from_kafka_to_kafka_demo():
>
>
>   # use blink table planner
>   env = StreamExecutionEnvironment.get_execution_environment()
>  
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>   env_settings =
> 

回复: (无主题)

2020-07-20 Thread 罗显宴
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复: (无主题)

2020-07-20 Thread 罗显宴
 我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复: flink1.11启动问题

2020-07-20 Thread 酷酷的浑蛋
1. flink1.11解压后,启动会报:
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
然后将jersey-client-1.9.jar、jersey-core-1.9.jar复制到lib下
2. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader
然后将javax.ws.rs-api-2.1.1.jar复制到lib下
3. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: 
org.glassfish.jersey.internal.RuntimeDelegateImpl
将jersey-common-3.0.0-M6.jar复制到lib下
4. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: jakarta.ws.rs.ext.RuntimeDelegate
将jakarta.ws.rs-api-3.0.0-M1.jar复制到lib下
5. 再次启动,报:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
6. 我崩溃了




在2020年07月20日 20:06,Leonard Xu 写道:
Hi,
看起来像是依赖冲突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar 
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查

祝好,
Leonard Xu

在 2020年7月20日,15:36,酷酷的浑蛋  写道:



Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
at com.sun.jersey.api.client.Client.(Client.java:170)
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.missfresh.Main.main(Main.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)


我已经在lib下添加了javax.ws.rs-api-2.1.1.jar



回复:(无主题)

2020-07-20 Thread 罗显宴
好的,谢谢大佬,我用这个试试


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月20日 15:11,shizk233 写道:
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


Re: flink1.11启动问题

2020-07-20 Thread Leonard Xu
Hi,
看起来像是依赖冲突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar 
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查

祝好,
Leonard Xu

> 在 2020年7月20日,15:36,酷酷的浑蛋  写道:
> 
> 
> 
> Flink1.11启动时报错:
> java.lang.LinkageError: ClassCastException: attempting to 
> castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>  to 
> jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
>at 
> javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
>at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
>at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
>at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>at 
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>at com.sun.jersey.api.client.Client.init(Client.java:342)
>at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>at com.sun.jersey.api.client.Client.(Client.java:187)
>at com.sun.jersey.api.client.Client.(Client.java:170)
>at 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
>at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>at 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
>at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
>at 
> org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
>at 
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
>at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
>at com.missfresh.Main.main(Main.java:142)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> 
> 
> 我已经在lib下添加了javax.ws.rs-api-2.1.1.jar
> 



Re: Flink Cluster Java 11 support

2020-07-20 Thread Yangze Guo
Hi,

AFAIK, there is no official image with Java 11. However, I think you
could simply build a custom image by changing the base layer[1] to
openjdk:11-jre.

[1] 
https://github.com/apache/flink-docker/blob/949e445006c4fc288813900c264847d23d3e33d4/1.11/scala_2.12-debian/Dockerfile

Best,
Yangze Guo


On Mon, Jul 20, 2020 at 7:24 PM Pedro Cardoso  wrote:
>
> Hello,
>
> Are there docker images available for Flink Clusters in Kubernetes that run 
> on Java 11?
>
> Thank you.
> Regards
>
> Pedro Cardoso
>
> Research Data Engineer
>
> pedro.card...@feedzai.com
>
>
>
>
>
>
> The content of this email is confidential and intended for the recipient 
> specified in message only. It is strictly prohibited to share any part of 
> this message with any third party, without a written consent of the sender. 
> If you received this message by mistake, please reply to this message and 
> follow with its deletion, so that we can ensure such a mistake does not occur 
> in the future.


How to get flink JobId in runtime

2020-07-20 Thread Si-li Liu
Hi

I want to retrieve flink JobId in runtime, for example, during
RichFunction's open method. Is there anyway to do it?

I checked the methods in RuntimeContext and ExecutionConfig, seems I can't
get this information from them.

Thanks!

-- 
Best regards

Sili Liu


????????????flink????????????????

2020-07-20 Thread jiafu
flinkflink-1.8.1
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to 
eagerly schedule a task whose inputs are not ready (result type: 
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED, 
producer slot: null).at 
org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:145)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:840)
 at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:621)
 at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
 at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)  
 at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)  at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:436)
   at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:637)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:229)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:186)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:96)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:146)
 at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
 at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.executiongraph.Execution.lambda$releaseAssignedResource$11(Execution.java:1350)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
  at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1345)
   at 
org.apache.flink.runtime.executiongraph.Execution.finishCancellation(Execution.java:1115)
at 
org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1094)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1628)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:517)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) 
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)  
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234)at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

测试一下社区邮件

2020-07-20 Thread sjlsumait...@163.com
忽略



sjlsumait...@163.com
 


Flink Cluster Java 11 support

2020-07-20 Thread Pedro Cardoso
Hello,

Are there docker images available for Flink Clusters in Kubernetes that run
on Java 11?

Thank you.
Regards

Pedro Cardoso

Research Data Engineer

pedro.card...@feedzai.com




[image: Follow Feedzai on Facebook.] [image:
Follow Feedzai on Twitter!] [image: Connect
with Feedzai on LinkedIn!] 
[image: Feedzai in Forbes Fintech 50!]


-- 
The content of this email is confidential and 
intended for the recipient 
specified in message only. It is strictly 
prohibited to share any part of 
this message with any third party, 
without a written consent of the 
sender. If you received this message by
 mistake, please reply to this 
message and follow with its deletion, so 
that we can ensure such a mistake 
does not occur in the future.


Re: flink1.11 run

2020-07-20 Thread Jingsong Li
是的。

但是不管怎么滚动,最终都是checkpoint完成后文件才可见

On Mon, Jul 20, 2020 at 7:10 PM Dream-底限  wrote:

> hi、
> 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
>
> 》可以,默认下 128MB 滚动,Checkpoint 滚动
>
> Jingsong Li  于2020年7月20日周一 下午6:12写道:
>
> > Hi Dream,
> >
> > > 1.一定要在flink内部先建立hive表吗?
> >
> > 不用,哪边建无所谓
> >
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > 可以,默认下 128MB 滚动,Checkpoint 滚动。
> >
> > Best,
> > Jingsong
> >
> > On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
> >
> > >  hi
> > > 好的,想问一下stream写hive表的时候:
> > > 1、一定要在flink内部先建立hive表吗?
> > > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> > >
> > > Rui Li  于2020年7月20日周一 下午4:44写道:
> > >
> > > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > > >
> > > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 
> wrote:
> > > >
> > > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > > >
> > > > > 异常:
> > > > > The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > method
> > > > > caused an error: No operators defined in streaming topology. Cannot
> > > > > generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > > at
> > > >
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > > streaming topology. Cannot generate StreamGraph.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > > ... 11 more
> > > > > 代码:
> > > > >
> > > > >  StreamExecutionEnvironment environment =
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > EnvironmentSettings settings =
> > > > >
> > > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > > StreamTableEnvironment tableEnv =
> > > > > StreamTableEnvironment.create(environment, settings);
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > > > environment.setStateBackend(new MemoryStateBackend());
> > > > >
> >  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > > >
> > > > > String name = "myhive";
> > > > > String defaultDatabase = "tmp";
> > > > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > > > String version = "1.1.0";
> > > > >
> > > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > > hiveConfDir, version);
> > > > > tableEnv.registerCatalog("myhive", hive);
> > > > > tableEnv.useCatalog("myhive");
> > > > >
> > > > >

Re: flink1.11 run

2020-07-20 Thread Dream-底限
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件

》可以,默认下 128MB 滚动,Checkpoint 滚动

Jingsong Li  于2020年7月20日周一 下午6:12写道:

> Hi Dream,
>
> > 1.一定要在flink内部先建立hive表吗?
>
> 不用,哪边建无所谓
>
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> 可以,默认下 128MB 滚动,Checkpoint 滚动。
>
> Best,
> Jingsong
>
> On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:
>
> >  hi
> > 好的,想问一下stream写hive表的时候:
> > 1、一定要在flink内部先建立hive表吗?
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
> >
> > Rui Li  于2020年7月20日周一 下午4:44写道:
> >
> > > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> > >
> > > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> > >
> > > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > > >
> > > > 异常:
> > > > The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method
> > > > caused an error: No operators defined in streaming topology. Cannot
> > > > generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > > at
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > > streaming topology. Cannot generate StreamGraph.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > > at
> > > >
> > > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > > ... 11 more
> > > > 代码:
> > > >
> > > >  StreamExecutionEnvironment environment =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > EnvironmentSettings settings =
> > > >
> > > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > > StreamTableEnvironment tableEnv =
> > > > StreamTableEnvironment.create(environment, settings);
> > > >
> > > >
> > > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > > environment.setStateBackend(new MemoryStateBackend());
> > > >
>  environment.getCheckpointConfig().setCheckpointInterval(5000);
> > > >
> > > > String name = "myhive";
> > > > String defaultDatabase = "tmp";
> > > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > > String version = "1.1.0";
> > > >
> > > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > > hiveConfDir, version);
> > > > tableEnv.registerCatalog("myhive", hive);
> > > > tableEnv.useCatalog("myhive");
> > > >
> > > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > > "  user_id BIGINT,\n" +
> > > > "  item_id STRING,\n" +
> > > > "  behavior STRING,\n" +
> > > > "  ts AS PROCTIME()\n" +
> > > > ") WITH (\n" +
> > > > " 'connector' = 'kafka-0.11',\n" +
> > > > " 'topic' = 'user_behavior',\n" +
> > > > " 'properties.bootstrap.servers' 

flink1.11启动问题

2020-07-20 Thread 酷酷的浑蛋


这flink1.11啥情况啊,一启动就报
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class


jersey..jar   javax.ws.xxx.jar我都放到lib了,怎么还是不行啊?这报的什么鬼东西?



Re: Kafka Consumer consuming rate suddenly dropped

2020-07-20 Thread Jake

Need some flink kafka consumer log and kafka server log!


> On Jul 20, 2020, at 5:45 PM, Mu Kong  wrote:
> 
> Hi, community
> 
> I have a flink application consuming from a kafka topic with 60 partitions.
> The parallelism of the source is set to 60, same with the topic partition 
> number.
> The cluster.evenly-spread-out-slots config is set to true in flink cluster.
> However, several hours later, the consuming rate of some subtasks of the 
> source suddenly dropped and caused delay.
> There is no back pressure in the application as shown in the flink UI.
> The consuming rate is like follows:
> 
> 
> Is anyone also encountering the same problem?
> Is there any way to further pinpoint the issue?
> 
> 
> Thanks in advance!
> Mu



?????? state??????checkpoint??????

2020-07-20 Thread sun
JM checkpoint ??


18:08:07.615 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 116 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 13 ms).
18:08:08.615 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 117 @ 1595239688615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:08.626 [flink-akka.actor.default-dispatcher-420] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 117 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 11 ms).
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched from state RUNNING to CANCELLING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from RUNNING to 
CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from RUNNING to 
CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from RUNNING to CANCELING.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (4/4) 
(e6ff47b0da505b2aa4d775d7821b8356) switched from CANCELING to CANCELED.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (3/4) 
(9035e059e465b8c520edf37ec734b43e) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (2/4) 
(7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - 
map_sub_order_detail - Sink: Print to Std. Out (1/4) 
(87d4c7af7d5fb5f81bae48aae77de473) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from CANCELING to 
CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-416] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom 
Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from CANCELING to 
CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job ty-bi-flink 
(acd456ff6f2f9f59ee89b126503c20f0) switched from state CANCELLING to CANCELED.
18:08:09.381 

Re: flink1.11 run

2020-07-20 Thread Jingsong Li
Hi Dream,

> 1.一定要在flink内部先建立hive表吗?

不用,哪边建无所谓

> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

可以,默认下 128MB 滚动,Checkpoint 滚动。

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:15 PM Dream-底限  wrote:

>  hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> Rui Li  于2020年7月20日周一 下午4:44写道:
>
> > tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
> >
> > On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
> >
> > > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> > >
> > > 异常:
> > > The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > > caused an error: No operators defined in streaming topology. Cannot
> > > generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot generate StreamGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > > at
> > >
> > >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > > ... 11 more
> > > 代码:
> > >
> > >  StreamExecutionEnvironment environment =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings settings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > > StreamTableEnvironment tableEnv =
> > > StreamTableEnvironment.create(environment, settings);
> > >
> > >
> > >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > > environment.setStateBackend(new MemoryStateBackend());
> > > environment.getCheckpointConfig().setCheckpointInterval(5000);
> > >
> > > String name = "myhive";
> > > String defaultDatabase = "tmp";
> > > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > > String version = "1.1.0";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > tableEnv.registerCatalog("myhive", hive);
> > > tableEnv.useCatalog("myhive");
> > >
> > > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > > "  user_id BIGINT,\n" +
> > > "  item_id STRING,\n" +
> > > "  behavior STRING,\n" +
> > > "  ts AS PROCTIME()\n" +
> > > ") WITH (\n" +
> > > " 'connector' = 'kafka-0.11',\n" +
> > > " 'topic' = 'user_behavior',\n" +
> > > " 'properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> > > " 'properties.group.id' = 'testGroup',\n" +
> > > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > > " 'format' = 'json',\n" +
> > > " 'json.fail-on-missing-field' = 'false',\n" +
> > > " 'json.ignore-parse-errors' = 'true'\n" +
> > > ")");
> > >
> > > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > > //" 

Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 Thread Jingsong Li
Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限  wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸  于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775...@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >public static void main(String[] arg) throws Exception{
> > >StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >String name= "myhive";
> > >String defaultDatabase = "default";
> > >String hiveConfDir =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >String version = "3.1.2";
> > >
> > >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >bsTableEnv.registerCatalog("myhive", hive);
> > >bsTableEnv.useCatalog("myhive");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)," +
> > >"  create_time TIMESTAMP " +
> > >") WITH (" +
> > >" 'connector' = 'kafka'," +
> > >" 'topic' = 'order.test'," +
> > >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >" 'properties.group.id' = 'testGroup'," +
> > >" 'scan.startup.mode' = 'earliest-offset', " +
> > >" 'format' = 'json'  " +
> > >")");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >"  id BIGINT ," +
> > >"  order_id STRING," +
> > >"  amount DECIMAL(10, 2)" +
> > >"  )");
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >"id, " +
> > >"order_id, " +
> > >"amount " +
> > >"FROM topic_products");
> > >
> > >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >table1.executeInsert("print_table");
> > >}
> > > }
> > >
> >
>


-- 
Best, Jingsong Lee


Re: Flink 1.11 Hive Streaming Write的问题

2020-07-20 Thread Dream-底限
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸  于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >public static void main(String[] arg) throws Exception{
> >StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >String name= "myhive";
> >String defaultDatabase = "default";
> >String hiveConfDir =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >String version = "3.1.2";
> >
> >HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >bsTableEnv.registerCatalog("myhive", hive);
> >bsTableEnv.useCatalog("myhive");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)," +
> >"  create_time TIMESTAMP " +
> >") WITH (" +
> >" 'connector' = 'kafka'," +
> >" 'topic' = 'order.test'," +
> >" 'properties.bootstrap.servers' = 'localhost:9092'," +
> >" 'properties.group.id' = 'testGroup'," +
> >" 'scan.startup.mode' = 'earliest-offset', " +
> >" 'format' = 'json'  " +
> >")");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >"  id BIGINT ," +
> >"  order_id STRING," +
> >"  amount DECIMAL(10, 2)" +
> >"  )");
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >"id, " +
> >"order_id, " +
> >"amount " +
> >"FROM topic_products");
> >
> >Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >table1.executeInsert("print_table");
> >}
> > }
> >
>


Re: flink1.11 run

2020-07-20 Thread Dream-底限
 hi
好的,想问一下stream写hive表的时候:
1、一定要在flink内部先建立hive表吗?
2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗

Rui Li  于2020年7月20日周一 下午4:44写道:

> tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
>
> On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:
>
> > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> >
> > 异常:
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: No operators defined in streaming topology. Cannot
> > generate StreamGraph.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot generate StreamGraph.
> > at
> >
> >
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> > at
> >
> >
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> > 代码:
> >
> >  StreamExecutionEnvironment environment =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> >
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> > StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(environment, settings);
> >
> >
> >
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > environment.setStateBackend(new MemoryStateBackend());
> > environment.getCheckpointConfig().setCheckpointInterval(5000);
> >
> > String name = "myhive";
> > String defaultDatabase = "tmp";
> > String hiveConfDir = "/etc/alternatives/hive-conf/";
> > String version = "1.1.0";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > tableEnv.registerCatalog("myhive", hive);
> > tableEnv.useCatalog("myhive");
> >
> > tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> > "  user_id BIGINT,\n" +
> > "  item_id STRING,\n" +
> > "  behavior STRING,\n" +
> > "  ts AS PROCTIME()\n" +
> > ") WITH (\n" +
> > " 'connector' = 'kafka-0.11',\n" +
> > " 'topic' = 'user_behavior',\n" +
> > " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> > " 'properties.group.id' = 'testGroup',\n" +
> > " 'scan.startup.mode' = 'earliest-offset',\n" +
> > " 'format' = 'json',\n" +
> > " 'json.fail-on-missing-field' = 'false',\n" +
> > " 'json.ignore-parse-errors' = 'true'\n" +
> > ")");
> >
> > //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> > //" user_id BIGINT,\n" +
> > //" item_id STRING,\n" +
> > //" behavior STRING,\n" +
> > //" tsdata STRING\n" +
> > //") WITH (\n" +
> > //" 'connector' = 'print'\n" +
> > //")");
> > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> > " user_id BIGINT,\n" +
> > " item_id STRING,\n" 

Fwd: Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Danny Chan

Best,
Danny Chan
-- 转发信息 --
发件人: Danny Chan 
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim 
主题: Re: [Table API] how to configure a nested timestamp field

> Or is it possible you pre-define a catalog there and register through the SQL 
> CLI yaml ?
>
> Best,
> Danny Chan
> 在 2020年7月20日 +0800 PM3:23,Dongwon Kim ,写道:
> > Hi Leonard,
> >
> > > Unfortunately the answer is no, the YAML you defined will parse by Table 
> > > API and then execute, the root cause of your post error is Table API does 
> > > not support computed column now,
> > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, 
> > > I think DDL is recommended way since FLINK 1.11.0.
> > Okay, thanks a lot for your input.
> >
> > I just tried out Flink SQL client and wanted to store pre-defined YAML 
> > files each declaring a source table from a Kafka topic.
> > As you advised, I have to manually enter DDL in the SQL client on FLINK 
> > 1.11.x
> >
> > Best,
> >
> > Dongwon
> >
> >
> > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu  wrote:
> > > > Hi, Kim
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > Can I have a YAML definition corresponding to the DDL you suggested?
> > > >
> > > > Unfortunately the answer is no, the YAML you defined will parse by 
> > > > Table API and then execute, the root cause of your post error is Table 
> > > > API does not support computed column now,
> > > >
> > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. 
> > > > BTW, I think DDL is recommended way since FLINK 1.11.0.
> > > >
> > > > Best,
> > > > Leonard Xu
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> > > >
> > > > > 在 2020年7月20日,14:30,Dongwon Kim  写道:
> > > > >
> > > > >
> > > > > I tried below (Flink 1.11.0) but got some error:
> > > > > > tables:
> > > > > >   - name: test
> > > > > >     type: source-table
> > > > > >     update-mode: append
> > > > > >     connector:
> > > > > >       property-version: 1
> > > > > >       type: kafka
> > > > > >       version: universal
> > > > > >       topic: ...
> > > > > >       properties:
> > > > > >         bootstrap.servers: ...
> > > > > >         group.id: ...
> > > > > >     format:
> > > > > >       property-version: 1
> > > > > >       type: json
> > > > > >     schema:
> > > > > >       - name: type
> > > > > >         data-type: STRING
> > > > > >       - name: location
> > > > > >         data-type: >
> > > > > >           ROW<
> > > > > >             id STRING,
> > > > > >             lastUpdateTime BIGINT
> > > > > >           >
> > > > > >       - name: timestampCol
> > > > > >         data-type: TIMESTAMP(3)
> > > > > >         rowtime:
> > > > > >           timestamps:
> > > > > >             type: from-field
> > > > > >             from: 
> > > > > > TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> > > > > > '-MM-dd HH:mm:ss'))
> > > > > >           watermarks:
> > > > > >             type: periodic-bounded
> > > > > >             delay: 5000
> > > > >
> > > > > SQL client doesn't complain about the file but, when I execute 
> > > > > "SELECT timestampCol from test", the job fails with the following 
> > > > > error message:
> > > > > > Caused by: java.lang.NullPointerException
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > > > > > at SourceConversion$4.processElement(Unknown Source)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at 
> > > > > 

Re: flink1.11 run

2020-07-20 Thread Rui Li
tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈

On Mon, Jul 20, 2020 at 4:29 PM Dream-底限  wrote:

> hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
>
> 异常:
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: No operators defined in streaming topology. Cannot
> generate StreamGraph.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot generate StreamGraph.
> at
>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> at
>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> at
>
> com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
> 代码:
>
>  StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(environment, settings);
>
>
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> environment.setStateBackend(new MemoryStateBackend());
> environment.getCheckpointConfig().setCheckpointInterval(5000);
>
> String name = "myhive";
> String defaultDatabase = "tmp";
> String hiveConfDir = "/etc/alternatives/hive-conf/";
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
> tableEnv.registerCatalog("myhive", hive);
> tableEnv.useCatalog("myhive");
>
> tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
> "  user_id BIGINT,\n" +
> "  item_id STRING,\n" +
> "  behavior STRING,\n" +
> "  ts AS PROCTIME()\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka-0.11',\n" +
> " 'topic' = 'user_behavior',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'scan.startup.mode' = 'earliest-offset',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")");
>
> //tableEnv.executeSql("CREATE TABLE print_table (\n" +
> //" user_id BIGINT,\n" +
> //" item_id STRING,\n" +
> //" behavior STRING,\n" +
> //" tsdata STRING\n" +
> //") WITH (\n" +
> //" 'connector' = 'print'\n" +
> //")");
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
> " user_id BIGINT,\n" +
> " item_id STRING,\n" +
> " behavior STRING,\n" +
> " tsdata STRING\n" +
> ") STORED AS parquet TBLPROPERTIES (\n" +
> " 'sink.rolling-policy.file-size' = '12MB',\n" +
> " 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
> " 'sink.rolling-policy.check-interval' = '1 min',\n" +
> " 

flink1.11 run

2020-07-20 Thread Dream-底限
hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:

异常:
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at
com.akulaku.data.flink.StreamingWriteToHive.main(StreamingWriteToHive.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
代码:

 StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(environment, settings);


environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.setStateBackend(new MemoryStateBackend());
environment.getCheckpointConfig().setCheckpointInterval(5000);

String name = "myhive";
String defaultDatabase = "tmp";
String hiveConfDir = "/etc/alternatives/hive-conf/";
String version = "1.1.0";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");

tableEnv.executeSql("CREATE TABLE tmp.user_behavior (\n" +
"  user_id BIGINT,\n" +
"  item_id STRING,\n" +
"  behavior STRING,\n" +
"  ts AS PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'kafka-0.11',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");

//tableEnv.executeSql("CREATE TABLE print_table (\n" +
//" user_id BIGINT,\n" +
//" item_id STRING,\n" +
//" behavior STRING,\n" +
//" tsdata STRING\n" +
//") WITH (\n" +
//" 'connector' = 'print'\n" +
//")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE TABLE tmp.streamhivetest (\n" +
" user_id BIGINT,\n" +
" item_id STRING,\n" +
" behavior STRING,\n" +
" tsdata STRING\n" +
") STORED AS parquet TBLPROPERTIES (\n" +
" 'sink.rolling-policy.file-size' = '12MB',\n" +
" 'sink.rolling-policy.rollover-interval' = '1 min',\n" +
" 'sink.rolling-policy.check-interval' = '1 min',\n" +
" 'execution.checkpointing.interval' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into streamhivetest select
user_id,item_id,behavior,DATE_FORMAT(ts, '-MM-dd') as tsdata from
user_behavior");

tableEnv.execute("stream-write-hive");


Re: flink job 跑一段时间 watermark 不推进的问题

2020-07-20 Thread shizk233
Hi,

Flink
metrics里有一项是task相关的指标currentWatermark,从中可以知道subtask_index,task_name,watermark三项信息,应该能帮助排查watermark的推进情况。

Best,
shizk233

snack white  于2020年7月20日周一 下午3:51写道:

> HI:
>   flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition
> 均有数据, flink job statue backend 为 memory 。有debug 的姿势推荐吗?  看过 CPU GC
> 等指标,看不出来有异常。
>
> Best regards!
> white
>
>


?????? pyflink1.11.0window

2020-07-20 Thread ??????????????
HI ??
  
??kafka
Traceback (most recent call last):
 File 
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, 
in deco
  return f(*a, **kw)
 File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 
328, in get_return_value
  format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.sqlUpdate.
: org.apache.flink.table.api.TableException: AppendStreamTableSink requires 
that Table has only insert changes.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)




During handling of the above exception, another exception occurred:


Traceback (most recent call last):
 File "tou.py", line 99, in 

?????? Flink Cli ????????

2020-07-20 Thread Z-Z
 cli ?? ??jobmanager  bin/flink run -d -p 1 -s {savepointuri} 
/data/test.jar 
webui??http://jobmanager:8081; submit new 
job??jar??savepoint path  





----
??: 
   "user-zh"



??????flink job ?????????? watermark ????????????

2020-07-20 Thread Cayden chen
??idea??local??




----
??: 
   "user-zh"



flink job 跑一段时间 watermark 不推进的问题

2020-07-20 Thread snack white
HI: 
  flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition 
均有数据, flink job statue backend 为 memory 。有debug 的姿势推荐吗?  看过 CPU GC 等指标,看不出来有异常。 

Best regards!
white



flink1.11启动问题

2020-07-20 Thread 酷酷的浑蛋


Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to 
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
 to 
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at 
javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125)
at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97)
at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172)
at com.sun.jersey.core.header.MediaTypes.(MediaTypes.java:65)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
at com.sun.jersey.api.client.Client.init(Client.java:342)
at com.sun.jersey.api.client.Client.access$000(Client.java:118)
at com.sun.jersey.api.client.Client$1.f(Client.java:191)
at com.sun.jersey.api.client.Client$1.f(Client.java:187)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
at com.sun.jersey.api.client.Client.(Client.java:187)
at com.sun.jersey.api.client.Client.(Client.java:170)
at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280)
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169)
at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61)
at 
org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at com.missfresh.Main.main(Main.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)


我已经在lib下添加了javax.ws.rs-api-2.1.1.jar



Re: SQL 报错只有 flink runtime 的 NPE

2020-07-20 Thread godfrey he
看不到图片信息,换一个图床工具上传图片吧

Luan Cooper  于2020年7月17日周五 下午4:11写道:

> 附一个 Job Graph 信息,在 Cal 处挂了
> [image: image.png]
>
> On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper  wrote:
>
>> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
>> *是指 UDF 返回了 NULL 导致的吗?*
>>
>>
>> On Fri, Jul 17, 2020 at 2:54 PM godfrey he  wrote:
>>
>>> udf_xxx的逻辑是啥?
>>>
>>>
>>> Luan Cooper  于2020年7月17日周五 下午2:40写道:
>>>
>>> > Hi
>>> >
>>> > 我有这么一个 SQL
>>> > INSERT INTO es
>>> > SELECT
>>> > a,
>>> > udf_xxx(b)
>>> > FROM mongo_oplog -- 自定义 TableFactory
>>> >
>>> > Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码
>>> Exception,可以稳定重现
>>> >
>>> > LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
>>> > (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>>> >
>>> > java.lang.NullPointerException
>>> >
>>> > at StreamExecCalc$8016.split$7938$(Unknown Source)
>>> >
>>> > at StreamExecCalc$8016.processElement(Unknown Source)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> >
>>> > at java.lang.Thread.run(Thread.java:748)
>>> >
>>> > 请问这种怎样情况排查问题?
>>> > 有任何线索都可以
>>> >
>>> > 感谢
>>> >
>>>
>>


Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.

Okay, thanks a lot for your input.

I just tried out Flink SQL client and wanted to store pre-defined YAML
files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK
1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu  wrote:

> Hi, Kim
>
> Hi Leonard,
>
> Can I have a YAML definition corresponding to the DDL you suggested?
>
>
> Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
>
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.
>
> Best,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> 
>
> 在 2020年7月20日,14:30,Dongwon Kim  写道:
>
>
> I tried below (Flink 1.11.0) but got some error:
>
>> tables:
>>   - name: test
>> type: source-table
>> update-mode: append
>> connector:
>>   property-version: 1
>>   type: kafka
>>   version: universal
>>   topic: ...
>>   properties:
>> bootstrap.servers: ...
>> group.id: ...
>> format:
>>   property-version: 1
>>   type: json
>> schema:
>>   - name: type
>> data-type: STRING
>>   - name: location
>> data-type: >
>>   ROW<
>> id STRING,
>> lastUpdateTime BIGINT
>>   >
>>   - name: timestampCol
>> data-type: TIMESTAMP(3)
>> rowtime:
>>   timestamps:
>> type: from-field
>> from:
>> TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, '-MM-dd
>> HH:mm:ss'))
>>   watermarks:
>> type: periodic-bounded
>> delay: 5000
>>
>
> SQL client doesn't complain about the file but, when I execute "SELECT
> timestampCol from test", the job fails with the following error message:
>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
>> at
>> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$4.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> 

Re: (无主题)

2020-07-20 Thread shizk233
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Leonard Xu
Hi, Kim

> Hi Leonard,
> 
> Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API 
and then execute, the root cause of your post error is Table API does not 
support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I 
think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
 


> 在 2020年7月20日,14:30,Dongwon Kim  写道:
> 
> 
> I tried below (Flink 1.11.0) but got some error:
> tables:
>   - name: test
> type: source-table
> update-mode: append
> connector:
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: ...
>   properties:
> bootstrap.servers: ...
> group.id : ...
> format:
>   property-version: 1
>   type: json
> schema:
>   - name: type
> data-type: STRING
>   - name: location
> data-type: >
>   ROW<
> id STRING,
> lastUpdateTime BIGINT
>   >
>   - name: timestampCol
> data-type: TIMESTAMP(3)
> rowtime:
>   timestamps:
> type: from-field
> from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> '-MM-dd HH:mm:ss'))
>   watermarks:
> type: periodic-bounded
> delay: 5000
> 
> SQL client doesn't complain about the file but, when I execute "SELECT 
> timestampCol from test", the job fails with the following error message:
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim  > wrote:
> Hi Leonard,
> 
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
> 
> Best,
> Dongwon
> 
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  > wrote:
> Hi, Kim
> 
> The reason your attempts (2) and (3) failed is that the json format does not 
> support convert a BIGINT to TIMESTAMP, you can 

Flink rest api cancel job

2020-07-20 Thread snack white
Hi, 
  When I using rest api to cancel my job , the rest 9 TM has been canceled 
quickly , but the other one TM is always cancelling status , someone can show 
me how can I solve the question . 
Thanks,
White 

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

I tried below (Flink 1.11.0) but got some error:

> tables:
>   - name: test
> type: source-table
> update-mode: append
> connector:
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: ...
>   properties:
> bootstrap.servers: ...
> group.id: ...
> format:
>   property-version: 1
>   type: json
> schema:
>   - name: type
> data-type: STRING
>   - name: location
> data-type: >
>   ROW<
> id STRING,
> lastUpdateTime BIGINT
>   >
>   - name: timestampCol
> data-type: TIMESTAMP(3)
> rowtime:
>   timestamps:
> type: from-field
> from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000,
> '-MM-dd HH:mm:ss'))
>   watermarks:
> type: periodic-bounded
> delay: 5000
>

SQL client doesn't complain about the file but, when I execute "SELECT
timestampCol from test", the job fails with the following error message:

> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim  wrote:

> Hi Leonard,
>
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
>
> Best,
> Dongwon
>
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu  wrote:
>
>> Hi, Kim
>>
>> The reason your attempts (2) and (3) failed is that the json format does
>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>> field and then use a computed column to extract TIMESTAMP field, you can
>> also define the time attribute on TIMESTAMP filed for using time-based
>> operations in Flink 1.10.1. But the computed column only support in pure
>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>> know.
>> The DDL syntax  as following:
>>
>> create table test (
>>   `type` STRING,
>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>timestampCol as
>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, '-MM-dd
>> HH:mm:ss')), —computed column
>>WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>> )   with (
>>   'connector' = '...',
>>   'format' = 'json',
>>   ...
>> );
>>

Re: Flink Cli 部署问题

2020-07-20 Thread Congxian Qiu
Hi

这个调试可以在 IDEA 进行的。

另外你说的通过 web ui 提交没有问题。请问下,是同一个 savepoint 通过 flink run 提交有问题,通过 web ui
提交没有问题吗?如果是的,能否分享下你的操作过程和命令呢?

Best,
Congxian


Z-Z  于2020年7月20日周一 上午11:33写道:

> 这是taskmanager新报的一个错,还是跟之前一样,用cli提交报错,用webui提交就没问题:
> 2020-07-20 03:29:25,959 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig   
>- The configuration 'value.serializer' was supplied
> but isn't a known config.
> 2020-07-20 03:29:25,959 INFO
> org.apache.kafka.common.utils.AppInfoParser   
>  - Kafka version : 0.11.0.2
> 2020-07-20 03:29:25,959 INFO
> org.apache.kafka.common.utils.AppInfoParser   
>  - Kafka commitId : 73be1e1168f91ee2
> 2020-07-20 03:29:25,974 ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder
> - Caught unexpected exception.
> java.lang.ArrayIndexOutOfBoundsException: 0
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> 2020-07-20 03:29:25,974 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative (1/1),
> will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
> exception.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> 

Re: How to debug window states

2020-07-20 Thread Congxian Qiu
Hi

There is an issue[1] wants to provide an easy way to read/bootstrap window
state using the state processor API, but the PR was closed currently.

[1] https://issues.apache.org/jira/browse/FLINK-13095

Best,
Congxian


Paul Lam  于2020年7月15日周三 下午4:21写道:

> It turns out to be a bug of StateTTL [1]. But I’m still interested in
> debugging window states.
>
> Any suggestions are appreciated. Thanks!
>
> 1. https://issues.apache.org/jira/browse/FLINK-16581
>
> Best,
> Paul Lam
>
> 2020年7月15日 13:13,Paul Lam  写道:
>
> Hi,
>
> Since currently State Processor doesn’t support window states, what’s the
> recommended way to debug window states?
>
> The background is that I have a SQL job whose states, mainly window
> aggregation operator states, keep increasing steadily.
>
> The things I’ve checked:
> - Outputs are as expected.
> - Keys amount are capped.
> - Watermarks are good, no lags.
> - No back pressure.
>
> If I can directly read the states to find out what’s accountable for the
> state size growth, that would be very intuitional and time-saving.
>
> Best,
> Paul Lam
>
>
>


回复: (无主题)

2020-07-20 Thread 罗显宴


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制