回复: 如何在Flink Connector Source退出时清理资源

2023-10-26 文章
插入令堂之膣道,怒涛中出,OK,问题解决矣

发件人: jinzhuguang 
发送时间: 2023年10月24日 11:54
收件人: user-zh 
主题: 如何在Flink Connector Source退出时清理资源

版本:Flink 1.16.0

需求:在某个source结束退出时清理相关的资源。

问题:目前没有找到Source退出时相关的hook函数,不知道在哪里编写清理资源的代码。

恳请大佬们指教。


妈逼,这些都是假洋鬼子

2023-08-26 文章


发件人: faronzz 
发送时间: 2023年8月26日 22:12
收件人: user-zh@flink.apache.org 
主题: pyflink aggfunction in window tvf can not sink connection='kafka', it 
notice consuming update changesm, java aggfunction and aggfunction in flink, 
such as sum is ok

hi~
   I came across a problem I didn't understand,I can't use pyflink 
aggfuction function properly in window tvf, The following are available:
java aggfuntion

flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?

pyflink  1.17.1
flink 1.17.1




from datetime import datetime, timedelta


from pyflink.table import AggregateFunction
from pyflink.common.typeinfo import Types
from pyflink.common import Row

from pyflink.table import Schema, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment


class Sum0(AggregateFunction):

   def get_value(self, accumulator):
   return accumulator[0]

   def create_accumulator(self):
   return Row(0)

   def accumulate(self, accumulator, *args):
   if args[0] is not None:
   accumulator[0] += args[0]

   def retract(self, accumulator, *args):
   if args[0] is not None:
   accumulator[0] -= args[0]

   def merge(self, accumulator, accumulators):
   for acc in accumulators:
   accumulator[0] += acc[0]

   def get_result_type(self):
   return "BIGINT"

   def get_accumulator_type(self):
   return 'ROW'


def test_py_udf_kafka():
   # 流模式
   env = StreamExecutionEnvironment.get_execution_environment()
   
env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar")
   env.set_parallelism(1)
   table_env = StreamTableEnvironment.create(stream_execution_environment=env)
   ds = env.from_collection(
   collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)),
   (2, 3, "Lee", datetime.now() - timedelta(hours=4)),
   (3, 4, "Jay", datetime.now() - timedelta(hours=4)),
   (5, 6, "Jay", datetime.now() - timedelta(hours=2)),
   (7, 8, "Lee", datetime.now())],
   type_info=Types.ROW([Types.INT(),
   Types.INT(),
   Types.STRING(),
   Types.SQL_TIMESTAMP()]))

   table_schema = Schema.new_builder() \
   .column("f0", "INT") \
   .column("f1", "INT") \
   .column("f2", "STRING") \
   .column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \
   .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \
   .build()

   ts = table_env.from_data_stream(ds, table_schema) \
   .alias("value", "count", "name", "rowtime")

   print("打印源表结构")
   ts.print_schema()

   sql_sink_dll_1 = """CREATE TABLE kafka_test(
   `name` string, `agg_data` bigint)
   with (
   'connector' = 'kafka',
   'topic'='test_java2',
   'properties.bootstrap.servers'='agent3:9092',
   'value.format' = 'json'
   );"""

   table_env.execute_sql(sql_sink_dll_1)
   table_env.create_temporary_view("source", ts)
   table_env.create_temporary_function(
   "sum_udf",
   Sum0())
   sql_query_system = """
   select name,sum(`value`) as agg_data from
   TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
   group by window_start, window_end, name
   """
   sql_query = """
   select name,sum_udf(`value`) as agg_data from
   TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
   group by window_start, window_end, name
   """
   print(table_env.explain_sql(sql_query))
   table_env.sql_query(sql_query).execute().print()
   table_env.sql_query(sql_query).execute_insert("kafka_test").wait()


if __name__ == "__main__":
   test_py_udf_kafka()



| |
faronzz
|
|
faro...@163.com


|


| |
faronzz
|
|
faro...@163.com
|



中国各地居民担心盐被污染,掀起抢购食盐的浪潮

2023-08-26 文章
日本福岛第一核电站的核处理水排入大海,中国外交部除了向日本政府提出严重交涉外,并开动宣传机器宣传"核污水排海可致癌"等负面消息,登上微博热搜。中国各地居民担心盐被污染,掀起抢购食盐的浪潮。

日本政府于本周四(24日)中午启动福岛第一核电站处理过的核废水排入太平洋的相关计划。中国各大媒体通过社交平台发出有关日本核污水可致癌、致畸,甚至造成人体DNA断裂的消息。封面新闻引述专家称,日本核污水排放可能造成基因损害。人民网指美国一边支持日本将污水排放到海里一边减少进口日本农林水产;新华社则报道指多名福岛居民接受新华社记者采访时,谴责日本政府和东京电力公司违反当初不擅自处理核污染水的承诺.


答复: 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”

2023-08-26 文章
资料显示,渐冻症一般指肌萎缩侧索硬化,它是上运动神经元和下运动神经元损伤之后,导致包括球部(指延髓支配的这部分肌肉)、四肢、躯干、胸部腹部的肌肉逐渐无力和萎缩。

发件人: tison 
发送时间: 2023年8月26日 10:15
收件人: user-zh@flink.apache.org ; 
priv...@flink.apache.org 
主题: Re: 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”

I suggest we should ban this spamming source..

Best,
tison.


北野 �悦�  于2023年8月26日周六 08:11写道:

> 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”
>
> 一位联合国高级政治事务官员今天表示,朝鲜最近发射的军事侦察卫星对国际民航和海上交通构成了重大威胁,强调需要采取切实可行的措施来缓解朝鲜半岛的紧张局势,为对话创造空间。
>
>
> 和平与安全
> 考克斯巴扎尔的一个罗兴亚难民营受到气旋“摩卡”冲击。
> 缅甸罗兴亚危机爆发六周年,联合国呼吁寻求全面、持久和包容解决方案
>
> 在8月25日缅甸西部若开邦罗兴亚人开始大规模出逃六年整之际,联合国秘书长古特雷斯通过发言人发表声明,呼吁国际社会继续针对缅甸危机寻求全面、包容和持久的解决方案。
>
>
> 人道主义援助
> 联合国继续向刚果(金)东部北基伍省因武装冲突而流离失所的人们提供人道主义援助。
> 刚果民主共和国670万人面临严重粮食不安全
>
> 世界粮食计划署今天表示,在刚果民主共和国东部的北基伍省、南基伍省和伊图里省,670万人面临严重的粮食不安全问题。然而,援助资源严重不足,无法满足当地的高位人道主义需求。
>
>
> 人道主义援助
> 一户苏丹家庭在位于乍得边境的难民入境点避难。
> 紧急救济协调员:战争和饥饿恐将摧毁苏丹
>
> 主管人道主义事务副秘书长兼紧急救济协调员格里菲思今天表示,苏丹战乱或将使该国儿童成为“迷失的一代”,他们的未来岌岌可危,如果冲突和饥饿蔓延,可能会摧毁这个国家。
>
>
> 人道主义援助
> 女孩们坐在泥屋受损的墙上,这座房子在2022年巴基斯坦的洪水中几乎被摧毁。
> 巴基斯坦洪灾一周年:儿童的灾难仍在继续
> 在前所未有的洪灾侵袭巴基斯坦一周年之际,联合国儿童基金会驻巴基斯坦代表法迪尔(Abdullah
> Fadil)今天发表讲话称,由于缺少恢复和重建的资金,巴基斯坦数百万儿童仍需依赖人道主义援助,400万儿童无法获得安全的饮用水。
>
>


朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”

2023-08-25 文章
朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”
一位联合国高级政治事务官员今天表示,朝鲜最近发射的军事侦察卫星对国际民航和海上交通构成了重大威胁,强调需要采取切实可行的措施来缓解朝鲜半岛的紧张局势,为对话创造空间。


和平与安全
考克斯巴扎尔的一个罗兴亚难民营受到气旋“摩卡”冲击。
缅甸罗兴亚危机爆发六周年,联合国呼吁寻求全面、持久和包容解决方案
在8月25日缅甸西部若开邦罗兴亚人开始大规模出逃六年整之际,联合国秘书长古特雷斯通过发言人发表声明,呼吁国际社会继续针对缅甸危机寻求全面、包容和持久的解决方案。


人道主义援助
联合国继续向刚果(金)东部北基伍省因武装冲突而流离失所的人们提供人道主义援助。
刚果民主共和国670万人面临严重粮食不安全
世界粮食计划署今天表示,在刚果民主共和国东部的北基伍省、南基伍省和伊图里省,670万人面临严重的粮食不安全问题。然而,援助资源严重不足,无法满足当地的高位人道主义需求。


人道主义援助
一户苏丹家庭在位于乍得边境的难民入境点避难。
紧急救济协调员:战争和饥饿恐将摧毁苏丹
主管人道主义事务副秘书长兼紧急救济协调员格里菲思今天表示,苏丹战乱或将使该国儿童成为“迷失的一代”,他们的未来岌岌可危,如果冲突和饥饿蔓延,可能会摧毁这个国家。


人道主义援助
女孩们坐在泥屋受损的墙上,这座房子在2022年巴基斯坦的洪水中几乎被摧毁。
巴基斯坦洪灾一周年:儿童的灾难仍在继续
在前所未有的洪灾侵袭巴基斯坦一周年之际,联合国儿童基金会驻巴基斯坦代表法迪尔(Abdullah 
Fadil)今天发表讲话称,由于缺少恢复和重建的资金,巴基斯坦数百万儿童仍需依赖人道主义援助,400万儿童无法获得安全的饮用水。



普里戈任专机,是被导弹打下来的?

2023-08-24 文章


俄罗斯私营军事实体瓦格纳组织的领导人普里戈任当地时间8月23日乘坐的专机坠毁后,各种关于专机失事原因的推测快速发酵。

俄罗斯社交媒体上,虽然不乏“他还活着”的传闻,但普里戈任的好友、俄罗斯右翼思想家杜金等人已公开悼念这位“坚强,自信,大胆的人”。圣彼得堡的原瓦格纳组织总部门口被普里戈任支持者摆放了鲜花,俄罗斯总统普京则已紧急回到莫斯科。俄罗斯总统新闻秘书佩斯科夫对媒体表示,普京已经得知了此事,并已经采取了必要的措施。

与瓦格纳有关联的社交媒体频道“灰色地带”宣称,普里戈任的专机是“被俄罗斯联邦国防部的防空火力击落”。俄罗斯联邦航空运输局的一位“消息人士”则对网络媒体Tsargrad称,这架飞机是被“炸毁”的。