Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Xiao Xu
Hi, Gabar,

Thanks to reply, I make sure that not conflict in maven, all the hadoop
dependency is in provided scope,
and checked my result jar it not contains
(src/main/resources/META-INF/services).

This is my pom:

http://maven.apache.org/POM/4.0.0;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

com.test.flink
flink-sync
1.0-SNAPSHOT
jar

Flink Quickstart Job


  1.8
  1.8
  1.18.1
  1.8
  2.12
  3.2.0
   3.3.4
   2.16.0
   3.2.0



   
  org.apache.flink
  flink-java
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-streaming-java
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-clients
  ${flink.version}
  provided
   
   
   
  org.apache.flink
  flink-connector-files
  ${flink.version}
   
   
  org.apache.flink
  flink-connector-kafka
  3.1.0-1.18
   
   
   
  org.apache.logging.log4j
  log4j-slf4j-impl
  ${log4j.version}
  runtime
  
 
slf4j-api
org.slf4j
 
  
   
   
  org.apache.logging.log4j
  log4j-api
  ${log4j.version}
  runtime
   
   
  org.apache.logging.log4j
  log4j-core
  ${log4j.version}
  runtime
   

   
  org.apache.flink
  flink-azure-fs-hadoop
  ${flink.version}
  provided
   


   
  
 org.apache.maven.plugins
 maven-assembly-plugin
 3.0.0
 
false

   jar-with-dependencies

 
 

   make-assembly
   package
   
  single
   

 
  
   




And like my reply in stackoverflow, I found the hadoop-common file :
https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
source code, it looks like have different class loader  make it not load
any filesystem.
I changed the ServiceLoader.load(FileSystem.class)  to
ServiceLoader.load(FileSystem.class,
FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
plugin, it works now,
So I'm not sure why it works

Gabor Somogyi  于2024年6月26日周三 16:52写道:

> Hi Xiao,
>
> I'm not quite convinced that the azure plugin ruined your workload, I
> would take a look at the dependency graph you've in the pom.
> Adding multiple deps can conflict in terms of class loader services
> (src/main/resources/META-INF/services).
>
> As an example you've 2 such dependencies where
> org.apache.flink.core.fs.FileSystemFactory is in the jar.
> Hadoop core contains "flie" and the other one something different. Let's
> say you don't use service merge plugin in your
> maven project. Such case Hadoop core `file` entry will be just overwritten
> by the second one.
>
> Solution: Either avoid deps with conflicting services or add 
> ServicesResourceTransformer
> to your maven project.
>
> G
>
>
> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:
>
>> Hi, all
>>
>> I try to use Flink to write Azure Blob Storage which called ADLS, I put
>> the flink-azure-fs-hadoop jar in plugins directory and when I start my
>> write job it shows:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "file"
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>> at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>

Flink k8s operator starts wrong job config from FlinkSessionJob

2024-06-26 Thread Peter Klauke
Hi all,

we're running a session cluster and I submit around 20 jobs to it at the
same time by creating FlinkSessionJob Kubernetes resources. After
sufficient time there are 20 jobs created and running healthy. However, it
appears that some jobs are started with the wrong config. As a result some
jobs are created multiple times, others are missing completely. Each job
runs the same logic, differing only by one argument that specifies the
country code, what determines the Kafka topic to read from and the sink
name.

The job code looks essentially like this:


parser = argparse.ArgumentParser(description="Process some input
> arguments.")
> parser.add_argument("--country", required=True, help="Country code to
> process")
> parser.add_argument("--pyFiles", required=False, help="Python files")
> args = parser.parse_args()
> country = args.country
> if country is None:
> raise ValueError("Country argument (--country) not provided.")
> t_env.execute_sql(f"""
> CREATE OR REPLACE TABLE source_kafka (
> raw_payload BYTES,
> data AS parse(raw_payload),
> `timestamp` AS parse(raw_payload).`timestamp`,
> WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'events.{country}',
> ...
> )
> """)
> table_result = t_env.sql_query("""
>...
> """)
> topic_name = "courier_states"
> sink_name = f"{topic_name}_{country}"
> env.create_temporary_table(sink_name,
> TableDescriptor.for_connector("kafka").schema(...).option("topic",
> topic_name).option(...))
> table_result.execute_insert(sink_name).wait()


The created FlinkSessionJob resources look mostly as I expect them to:

 Spec
>Deployment Name:  pyflink-streaming-job-courier-states
>Job:
>  Args:
>-py
>/opt/flink/entrypoint.py
>--country
>hu
>--pyFiles
>/opt/flink
>  Entry Class:   org.apache.flink.client.python.PythonDriver
>  Parallelism:   1
>  State: running
>  Upgrade Mode:  savepoint
>  Status:
>Error:  
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.Time │
>  
> outException","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.TimeoutException"},{"type":"java.util.concurrent.Timeou
> │
>  tException"}]}
>  Job Status:
>  Job Id:0504be940001
>  Job Name:
> insert-into_default_catalog.default_database.courier_states_eg


Only the generated "Job Name" is incorrect for some of them because the
corresponding job is started with the wrong configuration, the Job Name
should have the suffix "_hu". As you can see there are some Timeout
Exceptions, some ReconciliationExceptions and occasionally also
"RestHandlerException: File e37b911b-eec9-4e75-a6db-befc132a9c2b_empty.jar
does not exist", but none of them would explain why a job is started with a
wrong configuration.

We're using Flink 1.17.1 and Flink Kubernetes Operator 1.8.0 (1.5.0 same
issue).

For context:
We're having the use-case that we need to run the same Flink job for many
countries. For every country there is a separate Kafka topic to read from.
Every computation is separated for a country, e.g. all group by operations
are grouping by country code among other grouping columns. Having one Kafka
source subscribed to multiple Kafka topics (e.g. topic-pattern parameter)
has issues regarding the watermarks. The topics of some countries (with
less messages) are consumed much faster than other countries (with more
messages). That makes all messages from countries with more messages be
considered as late messages, yielding wrong window aggregation results.
What we'd need is per-key watermarking, what's not supported. Also,
initially I assumed that watermark alignment would be helpful here, but I
didn't get it working here. Hence running a session cluster for the same
Flink code with one job per country sounds like a convenient idea to me.

As far as I can see this looks like a bug of the Flink Kubernetes operator
to me. The only workaround I see would be to submit the jobs one by one,
but that's not really feasible for 20+ different jobs.
Does anyone have a good idea how to fix this?


Re: Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Gabor Somogyi
Hi Xiao,

I'm not quite convinced that the azure plugin ruined your workload, I would
take a look at the dependency graph you've in the pom.
Adding multiple deps can conflict in terms of class loader services
(src/main/resources/META-INF/services).

As an example you've 2 such dependencies where
org.apache.flink.core.fs.FileSystemFactory is in the jar.
Hadoop core contains "flie" and the other one something different. Let's
say you don't use service merge plugin in your
maven project. Such case Hadoop core `file` entry will be just overwritten
by the second one.

Solution: Either avoid deps with conflicting services or add
ServicesResourceTransformer
to your maven project.

G


On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu  wrote:

> Hi, all
>
> I try to use Flink to write Azure Blob Storage which called ADLS, I put
> the flink-azure-fs-hadoop jar in plugins directory and when I start my
> write job it shows:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "file"
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>
> I search the issue looks like this:
> https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep
>
> my env:
> Flink: 1.18.1
> JDK: 1.8
>
> Does anyone else have the same problem?
>


Re: Re:cdc读取oracle数据如何解析

2024-06-26 Thread Yanquan Lv
可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
> 我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
> 通过JsonDebeziumDeserializationSchema(true,
> customConverterConfigs);打印出来schema
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
> scaled decimal","field":"ID"},
> 那是不是有什么方法,把对应的字段类型先设置好传进去
>
> 发件人: Yanquan Lv
> 发送时间: 2024-06-26 14:46
> 收件人: user-zh
> 主题: Re: 回复:cdc读取oracle数据如何解析
> 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
> 可以通过添加下面代码来让展示信息更直观。
>
> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> JsonDebeziumDeserializationSchema schema = new
> JsonDebeziumDeserializationSchema(includeSchema,
> customConverterConfigs);
>
>
>
>
> ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
> > 数据没问题
> > "ID" "NAME"   "ADDTIME""PRICE"
> > 1 "aa" 2024-6-25 14:21:33  12.22
> >
> > 发件人: 15868861416
> > 发送时间: 2024-06-25 17:19
> > 收件人: user-zh@flink.apache.org
> > 主题: 回复:cdc读取oracle数据如何解析
> > 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
> >
> >
> >
> >
> > | |
> > 博星
> > |
> > |
> > 15868861...@163.com
> > |
> >
> >
> >  回复的原邮件 
> > | 发件人 | ha.fen...@aisino.com |
> > | 发送日期 | 2024年06月25日 15:54 |
> > | 收件人 | user-zh |
> > | 主题 | cdc读取oracle数据如何解析 |
> > 根据文档的代码
> > JdbcIncrementalSource oracleChangeEventSource =
> > new OracleSourceBuilder()
> > .hostname("host")
> > .port(1521)
> > .databaseList("ORCLCDB")
> > .schemaList("DEBEZIUM")
> > .tableList("DEBEZIUM.PRODUCTS")
> > .username("username")
> > .password("password")
> > .deserializer(new JsonDebeziumDeserializationSchema())
> > .includeSchemaChanges(true) // output the schema changes as well
> > .startupOptions(StartupOptions.initial())
> > .debeziumProperties(debeziumProperties)
> > .splitSize(2)
> > .build();
> > 返回的结果:
> >
> >
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
> >
> > 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
> >
>


??????????

2024-06-26 Thread wjw_bigd...@163.com




  
| ?? | <402987...@qq.com.INVALID> |
|  | 2024??06??26?? 16:38 |
| ?? | user-zh |
| ?? | |
|  | ?? |














----
??: 
   "user-zh"

<15171440...@163.com;
:2024??6??26??(??) 4:36
??:"user-zh"

回复:退订

2024-06-26 Thread 费文杰



















在 2024-06-26 15:07:45,"15868861416" <15868861...@163.com> 写道:
>你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | Yanquan Lv |
>| 发送日期 | 2024年06月26日 14:46 |
>| 收件人 |  |
>| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
>你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
>可以通过添加下面代码来让展示信息更直观。
>
>Map customConverterConfigs = new HashMap<>();
>customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
>"numeric");
>JsonDebeziumDeserializationSchema schema = new
>JsonDebeziumDeserializationSchema(includeSchema,
>customConverterConfigs);
>
>
>
>
>ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
>数据没问题
>"ID" "NAME"   "ADDTIME""PRICE"
>1 "aa" 2024-6-25 14:21:33  12.22
>
>发件人: 15868861416
>发送时间: 2024-06-25 17:19
>收件人: user-zh@flink.apache.org
>主题: 回复:cdc读取oracle数据如何解析
>检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | ha.fen...@aisino.com |
>| 发送日期 | 2024年06月25日 15:54 |
>| 收件人 | user-zh |
>| 主题 | cdc读取oracle数据如何解析 |
>根据文档的代码
>JdbcIncrementalSource oracleChangeEventSource =
>new OracleSourceBuilder()
>.hostname("host")
>.port(1521)
>.databaseList("ORCLCDB")
>.schemaList("DEBEZIUM")
>.tableList("DEBEZIUM.PRODUCTS")
>.username("username")
>.password("password")
>.deserializer(new JsonDebeziumDeserializationSchema())
>.includeSchemaChanges(true) // output the schema changes as well
>.startupOptions(StartupOptions.initial())
>.debeziumProperties(debeziumProperties)
>.splitSize(2)
>.build();
>返回的结果:
>
>{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
>如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>


回复: cdc读取oracle数据如何解析

2024-06-26 Thread 15868861416
你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | Yanquan Lv |
| 发送日期 | 2024年06月26日 14:46 |
| 收件人 |  |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:

{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同



Flink write ADLS show error: No FileSystem for scheme "file"

2024-06-26 Thread Xiao Xu
Hi, all

I try to use Flink to write Azure Blob Storage which called ADLS, I put the
flink-azure-fs-hadoop jar in plugins directory and when I start my write
job it shows:

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "file"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
~[hadoop-common-3.3.4.5.1.5.3.jar:?]

I search the issue looks like this:
https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep

my env:
Flink: 1.18.1
JDK: 1.8

Does anyone else have the same problem?


Re: 回复:cdc读取oracle数据如何解析

2024-06-26 Thread Yanquan Lv
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

> 数据没问题
> "ID" "NAME"   "ADDTIME""PRICE"
> 1 "aa" 2024-6-25 14:21:33  12.22
>
> 发件人: 15868861416
> 发送时间: 2024-06-25 17:19
> 收件人: user-zh@flink.apache.org
> 主题: 回复:cdc读取oracle数据如何解析
> 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
> | |
> 博星
> |
> |
> 15868861...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | ha.fen...@aisino.com |
> | 发送日期 | 2024年06月25日 15:54 |
> | 收件人 | user-zh |
> | 主题 | cdc读取oracle数据如何解析 |
> 根据文档的代码
> JdbcIncrementalSource oracleChangeEventSource =
> new OracleSourceBuilder()
> .hostname("host")
> .port(1521)
> .databaseList("ORCLCDB")
> .schemaList("DEBEZIUM")
> .tableList("DEBEZIUM.PRODUCTS")
> .username("username")
> .password("password")
> .deserializer(new JsonDebeziumDeserializationSchema())
> .includeSchemaChanges(true) // output the schema changes as well
> .startupOptions(StartupOptions.initial())
> .debeziumProperties(debeziumProperties)
> .splitSize(2)
> .build();
> 返回的结果:
>
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
> 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>