Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-10 Thread amenreet sodhi
Hey Hang,

I am deploying my Flink Job in HA application mode, Whenever I redeploy my
job, or deploy an updated version of the job, it's using the same job_id. I
haven't configured anywhere to use a fixed job id, I think it's doing it by
default. Can you share where I can configure this? I tried it once before,
but couldn't find anything.

Thanks
Regards
Amenreet Singh Sodhi

On Wed, May 10, 2023 at 8:36 AM Hang Ruan  wrote:

> Hi, amenreet,
>
> As Hangxiang said, we should use a new checkpoint dir if the new job has
> the same jobId as the old one . Or else you should not use a fixed jobId
> and the checkpoint dir will not conflict.
>
> Best,
> Hang
>
> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>
>> Hi,
>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>> as before ?
>> And you may also start the job without before state ?
>> The new job cannot know anything about before checkpoints, that's why the
>> new job will fail when it tries to generate a new checkpoint.
>> I'd like to suggest you to use different JOB_ID for different jobs, or
>> set a different checkpoint dir for a new job.
>>
>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>> wrote:
>>
>>> Hi all,
>>>
>>> Is there any way to prevent restart of flink job, or override the
>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>> name. I get the following exception and my job restarts, have been trying
>>> to find solution for a very long time but havent found anything useful yet,
>>> other than manually cleaning.
>>>
>>> 2023-02-27 10:00:50,360 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>> [] - Failed to trigger or complete checkpoint 1 for job
>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>> finalize checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> [?:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> [?:?]
>>>
>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>
>>> Caused by: java.io.IOException: Target file
>>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>>> already exists.
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> ... 7 more
>>>
>>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>>> [] - Error while processing AcknowledgeCheckpoint
>>> message
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>>> checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> 

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-10 Thread amenreet sodhi
Hi Weihua,

I am deploying my flink job in HA application mode on a kubernetes cluster.
I am using an external nfs mount for storing checkpoints. For some reason,
whenever I deploy an updated version of my application, it uses the same
job_id for the new job as for the previous job. Thus the flink job creates
checkpoints in the same directory, and at whatever point it encounters the
same checkpoint path(already existing checkpoint from previous versions of
my job) it throws the above error, and the job restarts. I have set my job
restart count as 3. So if this happens continuously for 3 times, the
jobmanager pod restarts, and then it starts the job again from checkpoint-0
or from the last saved savepoint. Then the same story repeats.

Thanks
Regards
Amenreet Singh Sodhi

On Wed, May 10, 2023 at 9:10 AM Weihua Hu  wrote:

> Hi,
>
> if for some reason there exists a checkpoint by same name.
>>
> Could you give more details about your scenarios here?
> From your description, I guess this problem occurred when a job restart,
> does this restart is triggered personally?
>
> In common restart processing, the job will retrieve the latest checkpoint
> from a high-available service(zookeeper or kubernetes),
> and then restore from it and make a new checkpoint with a new
> checkpoint-id.
> In this case, the job does not recover from the old checkpoint, but the
> old checkpoint path already exists.
>
> Best,
> Weihua
>
>
> On Wed, May 10, 2023 at 11:07 AM Hang Ruan  wrote:
>
>> Hi, amenreet,
>>
>> As Hangxiang said, we should use a new checkpoint dir if the new job has
>> the same jobId as the old one . Or else you should not use a fixed jobId
>> and the checkpoint dir will not conflict.
>>
>> Best,
>> Hang
>>
>> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>>
>>> Hi,
>>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>>> as before ?
>>> And you may also start the job without before state ?
>>> The new job cannot know anything about before checkpoints, that's why
>>> the new job will fail when it tries to generate a new checkpoint.
>>> I'd like to suggest you to use different JOB_ID for different jobs, or
>>> set a different checkpoint dir for a new job.
>>>
>>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>>> wrote:
>>>
 Hi all,

 Is there any way to prevent restart of flink job, or override the
 checkpoint metadata, if for some reason there exists a checkpoint by same
 name. I get the following exception and my job restarts, have been trying
 to find solution for a very long time but havent found anything useful yet,
 other than manually cleaning.

 2023-02-27 10:00:50,360 WARN  
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
 [] - Failed to trigger or complete checkpoint 1 for job
 6e6b1332. (0 consecutive failed attempts so far)

 org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
 finalize checkpoint.

 at
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 [?:?]

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 [?:?]

 at java.lang.Thread.run(Thread.java:834) [?:?]

 Caused by: java.io.IOException: Target file
 file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
 already exists.

 at
 org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
 ~[event_executor-1.0-SNAPSHOT.jar:?]

 at
 

Flink Job Failure for version 1.16

2023-05-10 Thread neha goyal
I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
performance degradation for the Flink pipeline which is using Flink's
managed state ListState, MapState, etc. Pipelines are frequently failing
with the Exception:

06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager -
Failed to trigger or complete checkpoint 36755 for job
d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint expired before completing.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.331
[flink-akka.actor.default-dispatcher-31] WARN
 o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint Coordinator is suspending.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)

Is there any issue with this Flink version or the new RocksDB version? What
should be the action item for this Exception?
The maximum savepoint size is 80.2 GB and we periodically(every 20 minutes)
take the savepoint for the job.
Checkpoint Type: aligned checkpoint


Re: Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

2023-05-10 Thread Sharil Shafie
Hi,

I believe only one partition based on the log output below. I didn't make
any changes to the DataGenerator from the flink playground.

2023-05-10 11:55:33,082 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - These
configurations '[key.deserializer, commit.offsets.on.checkpoint,
value.deserializer, enable.auto.commit, client.id.prefix,
partition.discovery.interval.ms, auto.offset.reset]' were supplied but are
not used yet.
2023-05-10 11:55:33,091 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka version: 3.4.0
2023-05-10 11:55:33,093 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka commitId: 2e1947d240607d53
2023-05-10 11:55:33,094 INFO  org.apache.kafka.common.utils.AppInfoParser
   [] - Kafka startTimeMs: 1683719733083
2023-05-10 11:55:33,180 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Starting the KafkaSourceEnumerator for consumer group null without
periodic partition discovery.
2023-05-10 11:55:33,682 INFO  org.apache.flink.configuration.Configuration
[] - Config uses fallback configuration key
'kubernetes.service-account' instead of key
'kubernetes.taskmanager.service-account'
2023-05-10 11:55:34,074 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
new TaskManager pod with name flink-deploy-taskmanager-1-1 and resource
<2048,1.0>.
2023-05-10 11:55:36,367 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
flink-deploy-taskmanager-1-1 is created.
2023-05-10 11:55:36,767 INFO
 org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
new TaskManager pod: flink-deploy-taskmanager-1-1
2023-05-10 11:55:36,772 INFO
 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Requested worker flink-deploy-taskmanager-1-1 with resource spec
WorkerResourceSpec {cpuCores=1.0, taskHeapSize=537.600mb (563714445 bytes),
taskOffHeapSize=0 bytes, networkMemSize=158.720mb (166429984 bytes),
managedMemSize=634.880mb (665719939 bytes), numSlots=4}.
2023-05-10 11:55:38,006 INFO
 org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator
[] - Discovered new partitions: [transactions-0]


Thanks for the suggestion. I'll have a look into that.

Regards.


On Thu, May 11, 2023 at 10:55 AM Weihua Hu  wrote:

> Hi,
>
> How many partitions does your kafka topic have?
>
> One possibility is that the kafka topic has only one partition,
> and when the source parallelism is set to 2, one of the source
> tasks cannot consume data and generate the watermark, so
> the downstream operator cannot align the watermark and cannot
> produce the data. [1]
>
> You can check the record send of source sub-tasks.
> If only one sub-task is outputting, you can set source idle timeout[2]
> to avoid always waiting for the watermark.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-idle-timeout
>
> Best,
> Weihua
>
>
> On Wed, May 10, 2023 at 8:20 PM Sharil Shafie  wrote:
>
>> Hi,
>>
>> I use the Real Time Reporting with the Table API
>> 
>> example to apply them in kubernetes by using flink kubernetes operator 1.4.
>>
>> When I use the job parallelism equal to 2, the spend_report table won't
>> be inserted and kept empty. However, when I set parallelism to 1, the table
>> gets inserted.
>>
>> The problem is that there is no exception (that I can find) that
>> indicates why the table is not inserted. The job is shown to be running
>> fine, where they are bytes and records received (refer to attached file).
>> There is however a difference - where there is an info on 'Low Watermark'
>> for setting parallelism to 1 and none when parallelism was 2.
>>
>> I have done this for both mysql and postgres and got the same outcome.
>>
>> I am fairly new to kubernetes and flink. Which part do I miss?
>>
>> The relevant files are below:
>>
>>- SpendReport.java  (taken from flink
>>playgrounds with modification)
>>- deployment_with_job.yaml (taken from
>>example in Kubernetes Operator repo with modification).
>>- Log when Parallelism = 2 
>>- Log when Parallelism = 1 
>>- Printscreen of task output for both settings (as attached).
>>
>>
>> Regards.
>>
>>


Re: flink1.16.1 jdbc-3.1.0-1.16 There is a problem trying left join

2023-05-10 Thread Hang Ruan
Hi, yangxueyong,

The filter(where condition) will be pushed down to the source if the
connector implements the interface `SupportsFilterPushDown`.
In your case, the sql planner analyzed that the records sent by
`test_flink_res1` would satisfy the conditions (`name` =
'abc0.11317691217472489') and (`id` IS NULL). These filters are pushed down
to the source.

Best,
Hang

Shammon FY  于2023年5月10日周三 14:15写道:

> Hi Yangxueyong,
>
> Are you sure this is your Flink SQL job? This SQL statement looks very
> strange, the table 'test_flink_res2' is both source and sink, and the join
> key is null.
>
> Best,
> Shammon FY
>
> On Wed, May 10, 2023 at 12:54 PM yangxueyong 
> wrote:
>
>> flink1.16.1
>>
>> mysql8.0.33
>>
>> jdbc-3.1.0-1.16
>>
>>
>> I have a sql,
>>
>> insert into test_flink_res2(id,name,address)
>> select a.id,a.name,a.address from test_flink_res1 a left join
>> test_flink_res2 b on a.id=b.id where a.name='abc0.11317691217472489' and
>> b.id is null;
>>
>> *Why does flinksql convert this statement into the following statement?*
>>
>> SELECT `address` FROM `test_flink_res1` WHERE ((`name` =
>> 'abc0.11317691217472489')) AND ((`id` IS NULL))
>>
>> *As a result, there is no data in test_flink_res2,why?*
>>
>>
>>
>>


Re: Flink Kubernetes Operator: Table Not Inserted When Using Parallel Job

2023-05-10 Thread Weihua Hu
Hi,

How many partitions does your kafka topic have?

One possibility is that the kafka topic has only one partition,
and when the source parallelism is set to 2, one of the source
tasks cannot consume data and generate the watermark, so
the downstream operator cannot align the watermark and cannot
produce the data. [1]

You can check the record send of source sub-tasks.
If only one sub-task is outputting, you can set source idle timeout[2]
to avoid always waiting for the watermark.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-idle-timeout

Best,
Weihua


On Wed, May 10, 2023 at 8:20 PM Sharil Shafie  wrote:

> Hi,
>
> I use the Real Time Reporting with the Table API
> 
> example to apply them in kubernetes by using flink kubernetes operator 1.4.
>
> When I use the job parallelism equal to 2, the spend_report table won't be
> inserted and kept empty. However, when I set parallelism to 1, the table
> gets inserted.
>
> The problem is that there is no exception (that I can find) that indicates
> why the table is not inserted. The job is shown to be running fine, where
> they are bytes and records received (refer to attached file). There is
> however a difference - where there is an info on 'Low Watermark' for
> setting parallelism to 1 and none when parallelism was 2.
>
> I have done this for both mysql and postgres and got the same outcome.
>
> I am fairly new to kubernetes and flink. Which part do I miss?
>
> The relevant files are below:
>
>- SpendReport.java  (taken from flink
>playgrounds with modification)
>- deployment_with_job.yaml (taken from
>example in Kubernetes Operator repo with modification).
>- Log when Parallelism = 2 
>- Log when Parallelism = 1 
>- Printscreen of task output for both settings (as attached).
>
>
> Regards.
>
>


Re: flink 1.13 partition.time-extractor.timestamp-pattern 格式

2023-05-10 Thread Shammon FY
Hi,

就像上面文档描述的,如果是多个字段组合成partition,可以在DDL中通过partition.time-
extractor.timestamp-pattern将多个字段按照自己的partition格式需求进行组装。
CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00'
);

如果只是一个timestamp字段,想要转换成其他的时间格式,可以参考文档[1]里的例子,新建一个自己的
PartitionTimeExtractor然后通过partition.time-extractor.class指定

在flink-1.15版本及以后[2],已经支持了partition.time-extractor.timestamp-formatter,对timestamp-pattern组装的partition时间戳进行格式转换

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-time-extractor
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/filesystem/#%e5%88%86%e5%8c%ba%e6%97%b6%e9%97%b4%e6%8f%90%e5%8f%96%e5%99%a8

Best,
Shammon FY

On Wed, May 10, 2023 at 5:42 PM 莫失莫忘  wrote:

>
> 我hive的分区格式是 dt='20200520',格式是 flinkSQL 实时任务写hive 只支持 '-mm-dd
> hh:mm:ss' 格式,请问怎么指定  partition.time-extractor.timestamp-pattern 的格式为 'mmdd
> hh:mm:ss' 。flink版本是1.13
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit
>
>
>
>
>
>
> --
>
>
>


[ANNOUNCE] Apache flink-connector-gcp-pubsub v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-gcp-pubsub v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352770

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-gcp-pubsub v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-gcp-pubsub v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352770

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-elasticsearch v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-elasticsearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352521

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-elasticsearch v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-elasticsearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352521

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-opensearch v1.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-opensearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352686

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-opensearch v1.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-opensearch v1.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352686

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-pulsar v4.0.0 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-pulsar v4.0.0. This release is compatible with Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352653

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-pulsar v4.0.0 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-pulsar v4.0.0. This release is compatible with Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352653

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-shaded v17.0 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-shaded v17.0.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352445

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-shaded v17.0 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-shaded v17.0.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352445

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-rabbitmq v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-rabbitmq v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352699

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


[ANNOUNCE] Apache flink-connector-rabbitmq v3.0.1 released

2023-05-10 Thread Martijn Visser
The Apache Flink community is very happy to announce the release of Apache
flink-connector-rabbitmq v3.0.1. This release is compatible with Flink
1.16.x and Flink 1.17.x

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352699

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Release Manager


回复: 不同的流程使用不同的并行度

2023-05-10 Thread 小昌同学
好滴呀  谢谢各位老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年4月21日 10:50 |
| 收件人 |  |
| 主题 | Re: 不同的流程使用不同的并行度 |
从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。

Shammon FY  于2023年4月21日周五 09:04写道:

Hi

DataStream作业设置并发度有两种方式
1. 在ExecutionEnvironment通过setParallelism设置全局并发
2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度

Best,
Shammon FY

On Fri, Apr 21, 2023 at 8:58 AM 小昌同学  wrote:



各位老师好,请教一下关于flink的并行度的问题;
我现在数据上游是kafka(四个分区),经过Flink
ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
| |
小昌同学
|
|
ccc0606fight...@163.com
|


Re: 退订

2023-05-10 Thread Hongshun Wang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,请发送任意内容的邮件到
user-zh-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

On Wed, May 10, 2023 at 1:38 AM Zhanshun Zou  wrote:

> 退订
>


flink 1.13 partition.time-extractor.timestamp-pattern ????

2023-05-10 Thread ????????
??hivedt='20200520',?? flinkSQL ??hive 
??'-mm-dd hh:mm:ss' 
??partition.time-extractor.timestamp-pattern 
'mmdd hh:mm:ss'??flink??1.13
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit

















S3 Parquet files rolling on event not working because PartFileInfo.getSize() does not increase.

2023-05-10 Thread Vararu, Vadim
Hi all,

Trying to have a s3 parquet bulk writer with file roll policy based on size 
limitation + checkpoint. For that I’ve extended the CheckpointRollingPolicy and 
overwritten shouldRollOnEvent to return true if the part size is greater than 
the limit.

The problem is that the part size that I get from PartFileInfo.getSize() is 
always 4. It never changes.

Is that a misconfiguration somewhere or that’s not supported for s3 parquet 
files?


@Slf4j
public class FileSizeAndOnCheckpointRollingPolicy extends 
CheckpointRollingPolicy {

private final long rollingFileSize;

public FileSizeAndOnCheckpointRollingPolicy (long rollingFileSize) {
this.rollingFileSize = rollingFileSize;
}

@Override
public boolean shouldRollOnEvent (PartFileInfo partFileState, 
CloudEventAvro element)
throws IOException {
log.info ("Part size: {}, rolling file size: {}", partFileState.getSize 
(), rollingFileSize);
return partFileState.getSize () > rollingFileSize;
}

@Override
public boolean shouldRollOnProcessingTime (PartFileInfo 
partFileState, long currentTime) {
return false;
}
}


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-10 Thread Hongshun Wang
Hi  casel.chen,
我理解你的意思是:
希望在ThirdPartyPaymentStream一条数据达到的30分钟后,*再触发查询*
,如果此时该数据在PlatformPaymentStream中还未出现,说明超时未支付,则输入到下游。而不是等ThirdPartyPaymentStream数据达到时再判断是否超时,因为此时虽然超时达到,但是也算已支付,没必要再触发报警了。

如果是流计算,可以采用timer定时器延时触发。

对于sql, 我个人的一个比较绕的想法是(供参考,不一定对):是通过Pulsar
Sink(或RocketMQ等有延迟队列的消息中间件)将PlatformPaymentStream的数据写入延迟队列(30分钟)[1],
然后延迟消费为PlatformPaymentStream2。然后将PlatformPaymentStream2 *left join*
ThirdPartyPaymentStream, 如果join后的结果不包含ThirdPartyPaymentStream部分,说明没有及时付款。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/#%e6%b6%88%e6%81%af%e5%bb%b6%e6%97%b6%e5%8f%91%e9%80%81

Best
Hongshun

On Wed, May 10, 2023 at 8:45 AM Shammon FY  wrote:

> Hi
>
> 如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
> DataStream s1 = ...;
> DataStream s2 = ...;
> DataStream s = s1.union(s1)...;
> Pattern = Pattern.begin("first")
> .subtype(E1.class)
> .where(...)
> .followedBy("second")
> .subtype(E2.class)
> .where(...)
>
> 如果使用Flink SQL,可以直接使用双流Join+窗口实现
>
> Best,
> Shammon FY
>
>
>
>
> On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:
>
> > 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> > SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> > 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
> >
> >
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。
>


Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-10 Thread Hang Ruan
Hi, Iris,

The Flink counter is cumulative. There are `inc` and `dec` methods in it.
As the value of the counter has been calculated in Flink, we do not need
use the counter metric in statsd to calculate.

Best,
Hang

Iris Grace Endozo  于2023年5月10日周三 14:53写道:

> Hey Hang,
>
> Thanks for the prompt response. Does this mean the Flink counters emitted
> via statsd are cumulative then? From the spec
> https://github.com/b/statsd_spec#counters
>
>
> > A counter is a gauge calculated at the server. Metrics sent by the
> client increment or decrement the value of the gauge rather than giving its
> current value.
>
> This means that counters are not monotonic and work like deltas that are
> aggregated on the server side.
>
> Cheers, Iris.
>
> --
>
> *Iris Grace Endozo,* Senior Software Engineer
> *M *+61 435 108 697
> *E* iris.end...@gmail.com
> On 10 May 2023 at 1:02 PM +1000, Hang Ruan ,
> wrote:
>
> Hi, Iris,
>
> The metrics have already be calculated  in Flink. So we only need to
> report these metric as the gauges.
> For example, the metric `metricA` is a Flink counter and is increased from
> 1 to 2. The statsd gauge will be 2 now. If we register it as a statsd
> counter, we will send 1 and 2 to the statsd counter. The statsd counter
> will be 3, which is a wrong result.
>
> Best,
> Hang
>
> Iris Grace Endozo  于2023年5月9日周二 19:19写道:
>
>> Hey folks trying to troubleshoot why counter metrics are appearing as
>> gauges on my end. Is it expected that the StatsdMetricsReporter is
>> reporting gauges for counters as well?
>>
>> Looking at this one:
>> https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
>> the statsd specifications state that counters need to be reported as
>> :|c[|@] but it seems it's defaulting to
>> "%s:%s|g" in the above. Ref: https://github.com/b/statsd_spec#counters
>>
>> Wondering if anyone else has hit this issue or there's an existing issue?
>>
>> Cheers, Iris.
>>
>> --
>>
>> *Iris Grace Endozo,* Senior Software Engineer
>> *M *+61 435 108 697
>> *E* iris.end...@gmail.com
>>
>


Re: StatsdMetricsReporter is emitting all metric types as gauges

2023-05-10 Thread Iris Grace Endozo
Hey Hang,

Thanks for the prompt response. Does this mean the Flink counters emitted via 
statsd are cumulative then? From the spec 
https://github.com/b/statsd_spec#counters


> A counter is a gauge calculated at the server. Metrics sent by the client 
> increment or decrement the value of the gauge rather than giving its current 
> value.

This means that counters are not monotonic and work like deltas that are 
aggregated on the server side.

Cheers, Iris.

--

Iris Grace Endozo, Senior Software Engineer
M +61 435 108 697
E iris.end...@gmail.com
On 10 May 2023 at 1:02 PM +1000, Hang Ruan , wrote:
> Hi, Iris,
>
> The metrics have already be calculated  in Flink. So we only need to report 
> these metric as the gauges.
> For example, the metric `metricA` is a Flink counter and is increased from 1 
> to 2. The statsd gauge will be 2 now. If we register it as a statsd counter, 
> we will send 1 and 2 to the statsd counter. The statsd counter will be 3, 
> which is a wrong result.
>
> Best,
> Hang
>
> > Iris Grace Endozo  于2023年5月9日周二 19:19写道:
> > > Hey folks trying to troubleshoot why counter metrics are appearing as 
> > > gauges on my end. Is it expected that the StatsdMetricsReporter is 
> > > reporting gauges for counters as well?
> > >
> > > Looking at this one: 
> > > https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L207:
> > >  the statsd specifications state that counters need to be reported as 
> > > :|c[|@] but it seems it's defaulting to 
> > > "%s:%s|g" in the above. Ref: https://github.com/b/statsd_spec#counters
> > >
> > > Wondering if anyone else has hit this issue or there's an existing issue?
> > >
> > > Cheers, Iris.
> > >
> > > --
> > >
> > > Iris Grace Endozo, Senior Software Engineer
> > > M +61 435 108 697
> > > E iris.end...@gmail.com


Re: flink1.16.1 jdbc-3.1.0-1.16 There is a problem trying left join

2023-05-10 Thread Shammon FY
Hi Yangxueyong,

Are you sure this is your Flink SQL job? This SQL statement looks very
strange, the table 'test_flink_res2' is both source and sink, and the join
key is null.

Best,
Shammon FY

On Wed, May 10, 2023 at 12:54 PM yangxueyong 
wrote:

> flink1.16.1
>
> mysql8.0.33
>
> jdbc-3.1.0-1.16
>
>
> I have a sql,
>
> insert into test_flink_res2(id,name,address)
> select a.id,a.name,a.address from test_flink_res1 a left join
> test_flink_res2 b on a.id=b.id where a.name='abc0.11317691217472489' and
> b.id is null;
>
> *Why does flinksql convert this statement into the following statement?*
>
> SELECT `address` FROM `test_flink_res1` WHERE ((`name` =
> 'abc0.11317691217472489')) AND ((`id` IS NULL))
>
> *As a result, there is no data in test_flink_res2,why?*
>
>
>
>