debezium-json数据timestamp类型时区问题

2022-11-22 Thread Kyle Zhang
Hi all,
我们有一个场景,是把oracle数据通过debezium-oracle-cdc插件抽到kafka中,后面接flink
sql分析,现在遇到一个时区的问题,比如数据库中有一个timestamp类型的字段,值是‘2022-11-17
16:16:44’,但是debezium处理的时候用了int64保存,还不带时区信息,变成1668701804000,导致flink
sql中用FROM_UNIXTIME处理后变成‘2022-11-18 00:16:44
’,差了8小时,需要手工再减8h。请问有没有一种统一的方式处理这种情况?

Best


如何正确扩展jdbc connector以支持更多的数据库方言?

2022-11-22 Thread casel.chen
如何正确扩展jdbc connector以支持更多的数据库方言?我们目前的做法是拉下flink源码直接进行修改添加方言支持,有没有更优雅的方式来实现呢?

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 Thread casel.chen
请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?

Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
python3 -m venv jxy_venv


我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的



Thanks





在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>Hi RS,
>
>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>
>Best,
>Xingbo
>
>[1]
>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>
>RS  于2022年11月22日周二 15:14写道:
>
>> Hi,
>> Flink版本:1.15.1
>>
>>
>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>
>>
>> 启动命令:
>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>
>>
>> 报错信息:
>> ...
>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>> undefined symbol: _Py_LegacyLocaleDetected
>> org.apache.flink.client.program.ProgramAbortException:
>> java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>> at
>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>> ... 16 more
>>
>>
>> 请问下,这种情况需要怎么处理 ?
>> flink的环境中,一定要安装python命令吗?
>>
>>
>> Thanks
>>
>>


Re:flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 Thread RS
Hi,


Flink的Metric了解下,里面应该有作业的状态
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter


配置不同的Metric方式,有的是拉取,有的是推送的机制,
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/



Thanks




在 2022-11-23 08:32:11,"casel.chen"  写道:
>请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?


Re: 如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 Thread Shengkai Fang
想问一下你想实现的功能是咋样的呢?是阿里云上的那种吗?

Best,
Shengkai

casel.chen  于2022年11月23日周三 08:29写道:

> flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink
> sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!


Re:Re:Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread RS
Hi,
我单独测试了下python的虚拟环境使用,发现目标机器必须存在python的环境,不然复制了虚拟环境过去也是无法执行的,
还以为虚拟环境复制过去就可以跑起来了,和我想的不太一样,哈哈~


3.6版本报错
# ./python
./python: error while loading shared libraries: libpython3.6m.so.1.0: cannot 
open shared object file: No such file or directory


3.9版本报错

# ./venv/bin/python

Could not find platform independent libraries 

Could not find platform dependent libraries 

Consider setting $PYTHONHOME to [:]

Python path configuration:

  PYTHONHOME = (not set)

  PYTHONPATH = (not set)

  program name = './venv/bin/python'

  isolated = 0

  environment = 1

  user site = 1

  import site = 1

  sys._base_executable = '/root/tmp/venv/bin/python'

  sys.base_prefix = '/usr/local'

  sys.base_exec_prefix = '/usr/local'

  sys.platlibdir = 'lib'

  sys.executable = '/root/tmp/venv/bin/python'

  sys.prefix = '/usr/local'

  sys.exec_prefix = '/usr/local'

  sys.path = [

'/usr/local/lib/python39.zip',

'/usr/local/lib/python3.9',

'/usr/local/lib/lib-dynload',

  ]

Fatal Python error: init_fs_encoding: failed to get the Python codec of the 
filesystem encoding

Python runtime state: core initialized

ModuleNotFoundError: No module named 'encodings'




Current thread 0x7ff010275740 (most recent call first):






Thanks





在 2022-11-23 09:25:04,"RS"  写道:
>Hi,
>我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
>python3 -m venv jxy_venv
>
>
>我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
>Thanks
>
>
>
>
>
>在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
>>Hi RS,
>>
>>你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
>>
>>Best,
>>Xingbo
>>
>>[1]
>>https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
>>
>>RS  于2022年11月22日周二 15:14写道:
>>
>>> Hi,
>>> Flink版本:1.15.1
>>>
>>>
>>> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
>>> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
>>> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
>>>
>>>
>>> 启动命令:
>>> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081 -n
>>> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
>>> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
>>>
>>>
>>> 报错信息:
>>> ...
>>> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
>>> undefined symbol: _Py_LegacyLocaleDetected
>>> org.apache.flink.client.program.ProgramAbortException:
>>> java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
>>> at
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
>>> Caused by: java.lang.RuntimeException: Python process exits with code: 127
>>> at
>>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>>> ... 16 more
>>>
>>>
>>> 请问下,这种情况需要怎么处理 ?
>>> flink的环境中,一定要安装python命令吗?
>>>
>>>
>>> Thanks
>>>
>>>


Re: Re: Flink启动python任务异常:undefined symbol: _Py_LegacyLocaleDetected

2022-11-22 Thread Xingbo Huang
conda构建的venv会把机器底层一些c的库包括进来,会比较全。通过python venv包构建的虚拟环境可能会因为跨机器出现问题。

Best,
Xingbo

RS  于2022年11月23日周三 09:25写道:

> Hi,
> 我这边使用的python命令构建的,没有用conda,这个应该没有影响吧
> python3 -m venv jxy_venv
>
>
> 我启动了一个单点的flink测试,本机启动,有python环境,测试是可以运行成功的
>
>
>
> Thanks
>
>
>
>
>
> 在 2022-11-22 15:39:48,"Xingbo Huang"  写道:
> >Hi RS,
> >
> >你是使用conda构建的venv吗,可以参考PyFlink 准备环境的文档[1]
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://pyflink.readthedocs.io/en/latest/getting_started/installation/prepare.html#create-a-virtual-environment-using-conda
> >
> >RS  于2022年11月22日周二 15:14写道:
> >
> >> Hi,
> >> Flink版本:1.15.1
> >>
> >>
> >>
> A机器:在A机器上创建python虚拟环境,版本3.6.8,安装flink等python包,然后打包ZIP,jxy_venv.zip,上传到HDFS上
> >> B机器:在B机器上,主机上没有Python环境,Flink运行在K8S的docker中,docker里面也没有python环境,
> >> C机器:在C机器上,有flink的client,存在python环境,负责启动任务
> >>
> >>
> >> 启动命令:
> >> ./bin/flink run -Dexecution.runtime-mode=BATCH -d -m 192.168.1.2:8081
> -n
> >> -py /xxx/main.py -pyfs hdfs://xxx/config.py -pyarch
> >> hdfs://xxx/jxy_venv.zip#venv -pyclientexec venv/jxy_venv/bin/python
> >>
> >>
> >> 报错信息:
> >> ...
> >> venv/jxy_venv/bin/python: symbol lookup error: venv/jxy_venv/bin/python:
> >> undefined symbol: _Py_LegacyLocaleDetected
> >> org.apache.flink.client.program.ProgramAbortException:
> >> java.lang.RuntimeException: Python process exits with code: 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
> >> at
> >>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at
> >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
> >> Caused by: java.lang.RuntimeException: Python process exits with code:
> 127
> >> at
> >> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> >> ... 16 more
> >>
> >>
> >> 请问下,这种情况需要怎么处理 ?
> >> flink的环境中,一定要安装python命令吗?
> >>
> >>
> >> Thanks
> >>
> >>
>