We started using several flinksql jobs in kubernetes cluster and would like to
understand how to safely pass passwords and other sensitive data in the
description of DDLs of tables.
Is there any way to use pointers to environment variables?
"This message
Hi Oscar,
> but we don't understand why this incremental checkpoint keeps increasing
AFAIK, when performing incremental checkpoint, the RocksDBStateBackend will
upload the new created SST files to remote storage. The total size of these
files is the incremental checkpoint size. However, the new
Hi team,
I am using the JdbcSink from flink-connector-jdbc artifact, version
3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
the invoke method and open method of jdbc sink. While implementing, I see
that JdbcSink.*sink() *returns a SinkFunction which only exposes the
Hi, Prashant.
I think Yu Chen has given a professional troubleshooting ideas. Another thing I
want to ask is whether you use some
user defined function to store some objects? You can firstly dump the memory
and get more details to check for memory leaks.
--
Best!
Xuyang
在
Hi, Dan.
Can you provide more details?
> I'm seeing unexpected behavior where it appears like the sql is executed
> locally.
Did you find a minicluster started locally running you program?
> In my case the remote environment is inside AWS and it doesn't appear to pick
> up the region and
Hi, Tauseef,
AFAIK, the most common way to get a list of tasks that a particular
job executes is through Flink's Web UI or REST API.
Using the Flink Web UI:
When you run a Flink cluster, a Web UI is launched by default on port
8081 of the JobManager. By accessing this Web UI through a browser,
Hi Dulce,
Thanks for your clarification. From my experience it is usually a
dependency problem, and you may verify this by checking whether you
have correctly included the dependencies in your jar (like `jar -tf
xxx.jar | grep ExecutionConfig`), and whether the dependencies in your
local
Hi Prashant,
OOMkill was mostly caused by workset memory exceed the pod limit.
We have to first expand the OVERHEAD memory properly by the following params to
observe if the problem can be solved.
```
taskmanager.memory.jvm-overhead.max=1536m
taskmanager.memory.jvm-overhead.min=1536m
```
And if
Thanks! Yeah I am not sure why it's handled so different with non-native
k8s mode.
If it's possible I think this would be a huge improvement.
On Mon, Nov 20, 2023, 12:55 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:
> Hi Trystan, I'm actually not very familiar with the operator's
Hi,
If I use StreamExecutionEnvironment.createRemoteEnvironment and then
var tEnv = StreamTableEnvironment.create(env) from the resulting remote
StreamExecutionEvironment will any sql executed using tEnv.executeSql be
executed remotely inside the flink cluster?
I'm seeing unexpected behavior
As per the docs, the `inputQueueLength` metric refers to the number of
queued input buffers, and cannot be used on its own in order to
determine buffered records.
For instance, if I know that there are 5 queued input buffers, I cannot
conclude anything regarding buffered records if the size
Hello,
We have been facing this oomkill issue, where task managers are getting
restarted with this error.
I am seeing memory consumption increasing in a linear manner, i have given
memory and CPU as high as possible but still facing the same issue.
We are using rocksdb for the state backend, is
Hi Dimitris
Maybe you can use the `inputQueueLength` metric.
Best,
Feng
On Tue, Nov 28, 2023 at 12:07 AM Dimitris Mpanelas via user <
user@flink.apache.org> wrote:
> Hello,
>
> I am trying to determine the buffered records in the input buffers of a
> task. I found the inputQueueSize metric.
Hi Oscar
Did you set
state.backend.latency-track.keyed-state-enabled=true;
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-backend-latency-track-keyed-state-enabled
Best,
Feng
On Mon, Nov 27, 2023 at 8:42 PM Oscar Perez via user
wrote:
> Hi,
>
> We
Hello,
I am trying to determine the buffered records in the input buffers of a task. I
found the inputQueueSize metric. According to the docs it is "The real size of
queued input buffers in bytes". The docs also state that "The size for local
input channels is always 0 since the local channel
Hello, Tauseef,
Can you give more details ? Are your task manager and job manager on the
same vm ?
How did you configure the Job manager address in the task manager conf file
?
Did you modify the binding in configuration files ?
Benoit
Le lun. 27 nov. 2023 à 14:29, Tauseef Janvekar
a écrit :
Hi Lasse,
The default flink-conf.yaml file bundled in the distribution should already
have a preset env.java.opts.all config for Java 17. Have you tried that?
Best,
Zhanghao Chen
From: Lasse Nedergaard
Sent: Monday, November 27, 2023 21:20
To: user
Subject:
Hi!
I am currently working with Flink's Table API (Flink version 1.17, Java 11). I
am pulling streaming data from a Kafka topic, processing it and I want to write
the processed data to Azure Blob Storage. I am using the Filesystem SQL
connector (following this page:
Hi HangThanks for the link. I will wait for 3.1 connector release and hope it will be included. Med venlig hilsen / Best regardsLasse NedergaardDen 27. nov. 2023 kl. 12.00 skrev Hang Ruan :Hi, Lasse.There is already a discussion about the connector releases for 1.18[1].Best,Hang[1]
Hi
I need some help to figure out how to get Flink 1.18 running on Java 17
According to the documentation for java compatibility I have to set
env.java.opts.all. As I use data types and generic list and maps from JDK.
I need to configure it so it works for both tests using a mini cluster and
Hi,
We are using flink 1.16 and we woud like to monitor the state metrics of a
certain job. Looking at the documentation I see that there are some state
access latencies:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/metrics/
Namely, I would like to access the following:
Hi,
We have a long running job in production and we are trying to understand
the metrics for this job, see attached screenshot.
We have enabled incremental checkpoint for this job and we use RocksDB as a
state backend.
When deployed from fresh state, the initial checkpoint size is about*
Dear Team,
How do we get list of tasks that a particular job executes.
If I go toTask Manager then I do not see any tasks. I am also facing the
issue where job manager is not able to access task manager but my jobs are
completing with no issues.
Any help is appreciated.
Thanks,
Tauseef
Hi, Lasse.
There is already a discussion about the connector releases for 1.18[1].
Best,
Hang
[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2
Lasse Nedergaard 于2023年11月24日周五 22:57写道:
> Hi
>
> From the documentation I can see there isn’t any ES support in Flink 1.18
>
Dear Team,
We are getting below error messages in our logs.
Any help on how to resolve would be greatly appreciated.
2023-11-27 08:14:29,712 INFO org.apache.pekko.remote.transport.
ProtocolStateActor [] - No response from remote for outbound
association. Associate timed out after [2
Hi Jintao,
Please send an email to user-unsubscr...@flink.apache.org to unsubscribe
the user mailing list.
Jintao Ma 于2023年11月27日周一 09:24写道:
> 退订
>
退订
Hello,
I'm using Flink 1.18 and have this error, any idea? Works in IDE.
2023-11-24 16:04:47.062 [DEBUG] [jobmanager-io-thread-1]
DefaultJobMasterServiceProcess - Initialization of the JobMasterService for job
abbad350a6a161919104f8e2fe2670b8 under leader id
Hello again!
I found this metric in the docs *inputQueueSize*, and this is the
description
*"The real size of queued input buffers in bytes. The size for local input
channels is always `0` since the local channel takes records directly from
the output queue". *I did a prom query for this metric.
Hi rania,
Through the following REST APIs, you can get the vertex metrics(chained
operator).
GET http://localhost:8081/jobs//vertices/
Note that vertex_id can be accessed from GET http://localhost:8081/jobs/
However, there is no interface for getting operator-level metrics.
But I was planning
Hello!
I want to get the pending records of a task. What is the best approach to get
the unprocessed records of a task?
Hi Oscar,
The Operator ID of the SQL job was generated by `StreamingJobGraphGenerator`,
it was releated with the topology of the stream graph.
If you would like to confirm that the problem was caused by the changes of
opearator id or not, please remove --allowNonRestoredState, and you will get
Hi rania,
If you means the Job Vertex ID of the JobGraph, you can try this:
http://localhost:8081/jobs/
Best,
Yu Chen
发件人: Zhanghao Chen
发送时间: 2023年11月26日 11:02
收件人: rania duni ; user@flink.apache.org
主题: Re: Operator ids
It is not supported yet. Curious why
你好,
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。
Best,
Zhanghao Chen
From: 唐凯
Sent: Saturday, November 25, 2023 9:23
To: user-zh
Subject: 退订
退订
It is not supported yet. Curious why do you need to get the operator IDs? They
are usually only used internally.
Best,
Zhanghao Chen
From: rania duni
Sent: Saturday, November 25, 2023 20:44
To: user@flink.apache.org
Subject: Operator ids
Hello!
I would like
No problem, glad to hear that it's working now!
With release candidates, we always publish the url for staged artifacts in
the release candidate vote threads so that you can point your code to
compile against those for testing purposes.
Would be great to have your +1 on the vote thread for 3.0.2
Thanks Gordon!
I didn't know the name of the repository
https://repository.apache.org/content/repositories/orgapacheflink-1675/
Additionally something learned.
Yes, with the new version I can add the dependency
"org.apache.flink" % "flink-connector-kafka" % "3.0.2-1.18",
and compile it
Hi Günter,
With Maven you'd list the staged repository holding the RC artifacts as a
repository:
```
test_kafka_rc
Apache Flink Kafka Connector v3.0.2
https://repository.apache.org/content/repositories/orgapacheflink-1675/
```
With SBT, I think the equivalent is
You, unfortunately, just cant AFAIK
On Sat, 25 Nov 2023 at 14:45, rania duni wrote:
> Hello!
>
> I would like to know how can I get the operator ids of a running job. I
> know how can I get the task ids but I want the operator ids! I couldn’t
> find something to the REST API docs.
> Thank you.
Hello!
I would like to know how can I get the operator ids of a running job. I know
how can I get the task ids but I want the operator ids! I couldn’t find
something to the REST API docs.
Thank you.
Hi Gordon,
thanks for working on it.
How can I reference the repository for the new artifact. Referencing
3.0.2-18 I get an unresolved dependency error.
Thanks for a hint.
Günter
sbt:flink_essential_swrapper> compile
[info] Updating
[info] Resolved dependencies
[warn]
[warn] Note:
Hi Dan
I think using Flink SQL should be able to meet your needs.
You can write a Flink Jar program. Accept different directories, schemas,
mappings, and sink tables to generate DDL and DML.
Assuming you have two directories:
directory1 -> f1, f2, f3, f4 -> iceberg1
directory2 -> f1, f2, f3 ->
Hi all,
I've cherry-picked FLINK-30400 onto v3.0 branch of flink-connector-kafka.
Treating this thread as justification to start a vote for 3.0.2 RC #1
immediately so we can get out a new release ASAP. Please see the vote
thread here [1].
@guenterh.lists Would you be able to test this
RC and
Hi,
We are having a job in production where we use table API to join multiple
topics. The query looks like this:
SELECT *
FROM topic1 AS t1
JOIN topic2 AS t2 ON t1.userId = t2.userId
JOIN topic3 AS t3 ON t1.userId = t3.accountUserId
This works and produces an EnrichedActivity any time any of
Hi
From the documentation I can see there isn’t any ES support in Flink 1.18 right
now and Flink-26088 (ES 8 support) is still open.
Does anyone has an idea when ES connector support will be available in 1.18
Please let me know.
Med venlig hilsen / Best regards
Lasse Nedergaard
> built a fat uber jar from quickstart with Flink 1.18.0 for
> flink-streaming-java and flink-clients, and flink-connector-kafka version
> 3.0.1-1.18
> then submitted to local Flink cluster 1.18.0. Things worked as expected and
> the job ran fine.
Hey,@Gordan
I guess things may work as
感谢大佬,我找到了。
所以说SQL类的内建函数实际上使用的是calcite的能力,而flink自己的内建函数是在table api中使用
> 2023年11月24日 17:07,Xuyang 写道:
>
> Hi,
> 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
> NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL
>
>
>
>
> --
>
>
Hi Hang,
Few more points regarding this issue
1. The issue does not replicate in my local installation and occurs only on
kubernetes server installation.
2. We have used kubernetes operator to install flink on the server
Please let me know if any other info is required here.
Thanks,
Tauseef
Hi Hang,
I cross checked this issue multiple times. I also upgraded to flink 1.18
but the issue persists.
Can you please let me know a few guidelines on how to investigate this and
fix it positively.
Thanks,
Tauseef
On Thu, 23 Nov 2023 at 18:08, Tauseef Janvekar
wrote:
> Thanks Hang.
>
> I
Hi Gordon,
I'm wondering if this might be a difference between how Maven and
Gradle build their projects, since you've done your validations with
Maven, but Günter uses Gradle.
In the end, the quickest fix would be to backport FLINK-30400 to the
Flink Kafka 3.0 release branch.
Best regards,
Hi,
关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL
--
Best!
Xuyang
在 2023-11-24 15:15:04,"jinzhuguang" 写道:
>flink 1.18.0
>
>
>例如我写下一条SQL:
> select * from KafkaTable where
flink 1.18.0
例如我写下一条SQL:
select * from KafkaTable where id is not null;
IS NOT NULL应该属于系统内建函数,于是我找到相关代码:
public static final BuiltInFunctionDefinition IS_NOT_NULL =
BuiltInFunctionDefinition.newBuilder()
.name("isNotNull")
.kind(SCALAR)
Hi,
Need to fix my previous comment in the last reply - it should be totally
fine that the POM files for flink-connector-kafka 3.0.1-1.18 point to an
older version.
For example, in the ongoing flink-connector-opensearch release 1.1.0-1.18,
the POM files also still point to Flink 1.17.1 [1].
If
Hi all,
There seems to be an issue with the connector release scripts used in the
release process that doesn't correctly overwrite the flink.version property
in POMs.
I'll kick off a new release for 3.0.2 shortly to address this. Sorry for
overlooking this during the previous release.
Best,
Thanks Feng,
I think my challenge (and why I expected I’d need to use Java) is that there
will be parquet files with different schemas landing in the s3 bucket - so I
don’t want to hard-code the schema in a sql table definition.
I’m not sure if this is even possible? Maybe I would have to
Hi Oxlade
I think using Flink SQL can conveniently fulfill your requirements.
For S3 Parquet files, you can create a temporary table using a filesystem
connector[1] .
For Iceberg tables, FlinkSQL can easily integrate with the Iceberg
catalog[2].
Therefore, you can use Flink SQL to export S3
Hi all,
I'm attempting to create a POC in flink to create a pipeline to stream parquet
to a data warehouse in iceberg format.
Ideally - I'd like to watch a directory in s3 (minio locally) and stream those
to iceberg, doing the appropriate schema mapping/translation.
I guess first; does this
Hi Danny
thanks for taking a look into it and for the hint.
Your assumption is correct - It compiles when the base connector is
excluded.
In sbt:
"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
exclude("org.apache.flink", "flink-connector-base"),
Günter
On 23.11.23 14:24,
看起来是类似的,不过这个报错应该是作业失败之后的报错,看还有没有其他的异常日志。
Best,
Feng
On Sat, Nov 4, 2023 at 3:26 PM Ray wrote:
> 各位专家:当前遇到如下问题1、场景:在使用Yarn场景下提交flink任务2、版本:Flink1.15.03、日志:查看yarn上的日志发下,版本上的问题2023-11-04
> 15:04:42,313 ERROR org.apache.flink.util.FatalExitExceptionHandler
> [] - FATAL: Thread
Hey all,
I believe this is because of FLINK-30400. Looking at the pom I cannot see
any other dependencies that would cause a problem. To workaround this, can
you try to remove that dependency from your build?
org.apache.flink
flink-connector-kafka
3.0.1-1.18
Hi, Gurnterh
It seems a bug for me that 3.0.1-1.18 flink Kafka connector use flink 1.17
dependency which lead to your issue.
I guess we need propose a new release for Kafka connector for fix this issue.
CC: Gordan, Danny, Martijn
Best,
Leonard
> 2023年11月14日 下午6:53,Alexey Novakov via user
Thanks Hang.
I got it now. I will check on this and get back to you.
Thanks,
Tauseef.
On Thu, 23 Nov 2023 at 17:29, Hang Ruan wrote:
> Hi, Tauseef.
>
> This error is not that you can not access the Kafka cluster. Actually,
> this error means that the JM cannot access its TM.
> Have you ever
Hi, Tauseef.
This error is not that you can not access the Kafka cluster. Actually, this
error means that the JM cannot access its TM.
Have you ever checked whether the JM is able to access the TM?
Best,
Hang
Tauseef Janvekar 于2023年11月23日周四 16:04写道:
> Dear Team,
>
> We are facing the below
Hi, patricia.
Can you attach full stack about the exception. It seems the thread reading
source is stuck.
--
Best!
Xuyang
At 2023-11-23 16:18:21, "patricia lee" wrote:
Hi,
Flink 1.18.0
Kafka Connector 3.0.1-1.18
Kafka v 3.2.4
JDK 17
I get error on class
Hi,
Flink 1.18.0
Kafka Connector 3.0.1-1.18
Kafka v 3.2.4
JDK 17
I get error on class
org.apache.flink.streaming.runtime.tasks.SourceStreamTask on
LegacySourceFunctionThread.run()
"java.util.concurrent.CompletableFuture@12d0b74 [Not completed, 1
dependents]
I am using the FlinkKafkaConsumer.
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:
SourceT: (
uuid String,
body_data ARRAY>
)
SinkT (
result ARRAY>
)
Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as
body_data.fild1, body_data[1]. Field2
Dear Team,
We are facing the below issue while connecting to confluent kafka
Can someone please help here.
2023-11-23 06:09:36,989 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: src_source -> Sink: Print to Std. Out (1/1)
Hi David,
Thanks for the confirmation. Let's fix the docs:
https://github.com/apache/flink/pull/23776
Thanks,
Alex
On Sun, 19 Nov 2023 at 01:55, David Anderson wrote:
> Hi, Alex!
>
> Yes, in PyFlink the various flatmap and process functions are implemented
> as generator functions, so they
Don't use the RestClusterClient; you can generate one from the openapi
spec (see the docs).
On 16/11/2023 20:36, Adrian Alexandru Vasiliu wrote:
Hello,
For a programmatic use in Java of the Flink REST API, which of these
options would be the best choice?
1. Direct use via a REST client
退订
--
发自我的网易邮箱手机智能版
- Original Message -
From: "Junrui Lee"
To: user-zh@flink.apache.org
Sent: Wed, 22 Nov 2023 10:19:32 +0800
Subject: Re: 退订
Hi,
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。
Best,
Junrui
万建国 <1097498...@qq.com.invalid> 于2023年11月22日周三 10:10写道:
>
Hello!
I am trying to run Beam pipeline in local docker-compose environment on top of
Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
I need to connect to secure Kafka cluster through kerberos.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.
The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.
Release highlights:
- Standalone
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.
The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.
Release highlights:
- Standalone
Hi, casel.
这种现在应该是没支持,好像也没有issue说要支持,可以去jira里建一个feature看看大家的反响。
目前同样可以实现的方式:
1. 三次look up join + union + udaf。
2. udf,row by row自己查,搞一个缓存提高性能。
3. 将社区的connector魔改一下,重新打包使用。
4. ..
--
Best!
Xuyang
在 2023-11-22 20:44:47,"casel.chen" 写道:
>有一张维表 user,包含id和name字段
>id | name
输入:
{
"uuid":"",
"body_data":
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
}
输出:
[
{
"uuid": "",
"body_data: null,
"body_data.fild1": "123”,
"body_data.fild2": "234"
},
{
"uuid": "",
"body_data": null,
"body_data.fild1":
有一张维表 user,包含id和name字段
id | name
-
1 | zhangsan
2 | lisi
3 | wangwu
现在实时来了一条交易数据
id | creator_id | approver_id | deployer_id
-
1 | 1| 2 | 3
希望lookup维表user返回各用户名称
id |
退订
退订
At 2023-10-04 10:06:45, "1" wrote:
>
Hi, Tauseef.
This error happens after the job is running, so I think at least it proves that
there is no problem with the modified code.
Now let's focus on the new error.
The exception seems that flink can't read the metadata from kafka to get all of
the partitions about this topic.
Can you
Hi, casel.
可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and
k3=v3的用法的。
--
Best!
Xuyang
在 2023-11-22 11:55:11,"casel.chen" 写道:
>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>
>
>id key1 key2 key3
>想实现批量lookup查询返回一行数据 id value1 value2 value3
>
>
>查了下目前包括jdbc
一行数据带了三个待lookup查询的key,分别是key1,key2和key3
id key1 key2 key3
想实现批量lookup查询返回一行数据 id value1 value2 value3
查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
id key1 key2 key3
先将多列转成多行
id key1
id key2
id key3
分别进行lookup join后得到
id value1
id value2
id value3
最后多行转多列返回一行数据
id
Hi, Dale.
Thanks for your professional explanation ;)
--
Best!
Xuyang
在 2023-11-22 00:39:47,"Dale Lane" 写道:
FYI in case it’s relevant for this discussion
> I'm not sure what is the ` Avro JSON` means
Avro supports two encoding mechanisms – binary encoding, and JSON
Hi, Tauseef.
I modify you code and the following can work. Can you try that?
```
static class C1 {
Metrics metrics;
public C1(Metrics metrics) {
this.metrics = metrics;
}
public Metrics getMetrics() {
return metrics;
}
}
static class Metrics {
Hi,
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。
Best,
Junrui
万建国 <1097498...@qq.com.invalid> 于2023年11月22日周三 10:10写道:
> 退订
The recipe we used to get this working was increasing
kubernetes.operator.reconcile.interval and
kubernetes.operator.observer.progress-check.interval which essentially made
reconciliation slower but more smooth for applies across a large number of
bundled FlinkDeployments. We also bumped
FYI in case it’s relevant for this discussion
> I'm not sure what is the ` Avro JSON` means
Avro supports two encoding mechanisms – binary encoding, and JSON encoding. [1]
In other words, an Avro record, although normally represented as binary data,
can instead be represented as a JSON
Thanks Yubin and Jane for the discussion!
+1 to fix this bug, although it's usually used as a test source, it's
important to provide the correct behavior for users.
for the invalid field length configured by users, I think it's better to
raise an error instead of using default value silently.
Hi, Tauseef.
This is an example to use custom POJO with flatmap[1]. If possible, can you
post your code and tag the flink version?
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/etl/#flatmap
--
Best!
Xuyang
At 2023-11-21 22:48:41, "Tauseef
Hi, Praveen.
`OpenSearch supports writing data in the JSON format, but in Flink its default
data format is Avro JSON.`
I'm not sure what is the ` Avro JSON` means. In Opensearch connector, there are
multi formats it supports sucn as "avro", "format" and etc by adding
corresponding jars, and
Dear Team,
I am getting the following error while using flatMap.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: The return type of function
'defineWorkflow(OtelTransformerJob.java:75)' could not be determined
automatically, due to type
Hi Yubin,
Thanks for driving this discussion. Perhaps a specific example can better
illustrate the current issue.
Considering the following DDL, f0 will always be generated with a default
char length of 100, regardless of char(5), bcause the connector option
'fields.f0.length' is not specified
Hello Team
Please helps to answer the below query.
1. OpenSearch supports writing data in the JSON format, but in Flink its
default data format is Avro JSON. What is the best practice to write data to
Open Search using Flink OpenSearch Connector? Do we need to manually convert
Avro JSON
Seems the operator didn't get restarted automatically after the configmap
is changed. After a roll-out restart, the exception disappeared. Never mind
this issue. Thanks.
On Tue, Nov 21, 2023 at 11:31 AM Xiaolong Wang
wrote:
> Hi,
>
> Recently I upgraded the flink-kubernetes-operator from 1.4.0
Hi,
Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to
use Flink 1.18. After that, the operator kept reporting the following
exception:
2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource
Hello Flink community ,
We are currently working on a Flink job that consumes messages from
RabbitMQ, with checkpointing configured to at-least-once mode.
In our job, we make external API requests to retrieve information. If the
external api is down or a timeout is occured, we currently throw an
Hi Trystan, I'm actually not very familiar with the operator's internals,
but I'd guess that limitation is in Flink itself - application mode is a
feature from core Flink, the operator just configures it based on the CRDs
it defines. Maybe one of the maintainers can confirm.
Regards,
Alexis.
On
Thanks Alexis, I can give that a try. However, that seems less than ideal
from the user's perspective.
Is there a technical reason why the operator can't support this combination
of modes? I'd really like to just let the system do its thing rather than
build a complicated two-jar approach.
Hi casel,
我们在生产中有类似的做法,可以考虑实现一个udtf,监听apollo的配置,根据配置选择是否filter数据。
Best,
Yu Chen
> 2023年11月20日 21:05,Xuyang 写道:
>
> Hi,
>是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?
>
>
>
>
> --
>
>Best!
>Xuyang
>
>
>
>
>
> 在 2023-11-20 19:24:47,"casel.chen" 写道:
>>
1601 - 1700 of 66507 matches
Mail list logo