Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-22 Thread wushijjian5
1. 两个流的数据分别存入hbase;
2. 然后起新的消费者,两条流做outer join ,  并设置一定时间的TTL:
A, 能关联上直接输出
B, 右流为空,左流关联hbase补充右流数据
C, 左流为空,右流关联hbase补充左流数据
3, 全局数据根据一个version或ts可以排序去重


> 2022年3月22日 17:07,Michael Ran  写道:
> 
> 可以考虑存储层 局部更新
> 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道:
>> Cdc join
>> 
>>> 2022年3月21日 14:01,JianWen Huang  写道:
>>> 
>>> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。
>>> 例子:
>>> 变化前:
>>> A流:
>>> name  gender
>>> a male
>>> b male
>>> c female
>>> 
>>> 纬度表B:
>>> nameage
>>> a   16
>>> b17
>>> 
>>> 结果:
>>> name   gender   age
>>> a   male  16
>>> b   male 17
>>> 
>>> 发生变化后:
>>> 纬度表B:
>>> nameage
>>> a   16->17
>>> b17->18
>>> 
>>> 结果:
>>> name   gender   age
>>> a   male  17
>>> b   male  18
>>> 
>>> 目前我想到一个做法是将维度表做成流然后关联事实表,最后根据更新时间取top1最新sink到存储里。请问大家有别的更好做法吗



FileSystem format

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?Best,Quynh  Sent from Mail for Windows 


Re: Flink plugin can't get access to some of the classes in /opt/flink/lib folder

2022-03-22 Thread Caizhi Weng
Hi!

You can set the classloader.parent-first-patterns.additional configuration
to load additional classes from the lib directory. See [1] for more details.

What classes are causing the ClassNotFoundException? From the default value
of classloader.parent-first-patterns.default all classes in
org.apache.flink package should be loaded correctly.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order

Fan Xie  于2022年3月23日周三 07:43写道:

> Hi Flink Community,
>
> Recently I implemented a new metrics reporter to report Flink runtime
> metrics to a Kafka topic. After building the project, I move my reporter
> jar file from my build folder to the plugin folder:
>
> COPY build/flink-diagnostics-message-reporter-${VERSION}.jar 
> ${FLINK_HOME}/plugins/flink-diagnostics-message-reporter/flink-diagnostics-message-reporter.jar
>
>
> But then I see lots of ClassNotFoundException when I tried to start a test
> job with this new metrics reporter. Looks like the problem is that the
> metrics reporter plugin can not get access to some of the jars in the
> /opt/flink/lib folder. By reading this document:
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/,
> looks like there is a whitelist to define what packages can be accessed by
> the plugins. Is there any way to config this whitelist so that the plugin
> jar can get accessed to classes in this folder? If this can't work, are
> there other solutions for this problem like changing the classpath for the
> plugin?
> Plugins | Apache Flink
> 
> Plugins # Plugins facilitate a strict separation of code through
> restricted classloaders. Plugins cannot access classes from other plugins
> or from Flink that have not been specifically whitelisted. This strict
> isolation allows plugins to contain conflicting versions of the same
> library without the need to relocate classes or to converge to common
> versions.
> nightlies.apache.org
>
> Best,
> Fan
>


Flink plugin can't get access to some of the classes in /opt/flink/lib folder

2022-03-22 Thread Fan Xie
Hi Flink Community,

Recently I implemented a new metrics reporter to report Flink runtime metrics 
to a Kafka topic. After building the project, I move my reporter jar file from 
my build folder to the plugin folder:

COPY build/flink-diagnostics-message-reporter-${VERSION}.jar 
${FLINK_HOME}/plugins/flink-diagnostics-message-reporter/flink-diagnostics-message-reporter.jar


But then I see lots of ClassNotFoundException when I tried to start a test job 
with this new metrics reporter. Looks like the problem is that the metrics 
reporter plugin can not get access to some of the jars in the /opt/flink/lib 
folder. By reading this document: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/,
 looks like there is a whitelist to define what packages can be accessed by the 
plugins. Is there any way to config this whitelist so that the plugin jar can 
get accessed to classes in this folder? If this can't work, are there other 
solutions for this problem like changing the classpath for the plugin?
Plugins | Apache 
Flink
Plugins # Plugins facilitate a strict separation of code through restricted 
classloaders. Plugins cannot access classes from other plugins or from Flink 
that have not been specifically whitelisted. This strict isolation allows 
plugins to contain conflicting versions of the same library without the need to 
relocate classes or to converge to common versions.
nightlies.apache.org

Best,
Fan


Clust rconfiguration for network-intensive Flink job

2022-03-22 Thread Vasileva, Valeriia
Hi, folks!

I am running Flink Streaming job in mode=Batch on EMR.

The job has following stages:
1.  Read from MySQL
2.  KeyBy user_id
3.  Reduce by user_id
4.  Async I/O enriching from Redis
5.  Async I/O enriching from other Redis
6.  Async I/O enriching from REST #1
7.  Async I/O enriching from REST #2
8.  Async I/O enriching from REST #2
9.  Write to Elasticsearch



As you can see it is very network-intensive job, it has a lot of Async 
operators that we currently cannot get rid of. As it is not a general use-case 
I wanted to ask for advice: what cluster configuration specifics we should take 
into account if we need to run such kind of a job as efficiently as possible?

I have already tried several configurations:

#1

master: r6g.xlarge
core: r6g.xlarge (CPU: 4; RAM: 32 GiB; Disk: EBS 128 GB, network: 1.25 Gigabit 
baseline with burst up to 10 Gigabit)
problems: works with sort-based 
shuffling enabled 
but very slowly (~36h), as this type of instance has a baseline & burst 
performance, when burst credits are exhausted degrades to the baseline of 
1GBps, that slows down I/O. With hash-based shuffling fails on KeyBy -> Reduce 
with "Connection reset by peer", Task Manager fails -> Job fails -> Job manager 
is not able to restart.

#2

master: m5.xlarge
core: r6g.12xlarge (CPU: 48; RAM: 384 GiB; Disk: EBS 1.5 TB, network: 20 
Gigabit)
problems: job fails. With sort-based shuffling fails on the writing phase with 
exception "Failed to transfer file from TaskExecutor". With hash-based 
shuffling fails on the same stage with "Connection reset by peer".

#3

master: m5.xlarge
core: c6gn.4xlarge (CPU: 16; RAM: 32 GiB; Disk: EBS 128 GB, network: 25 Gigabit)
resume: this configuration actually works fine, but it is a bit expensive, so I 
would like to find some cheaper solution.

I would appreciate any help!

Kind Regards,
Valeriia



Using Amazon EC2 Spot instances with Flink

2022-03-22 Thread Vasileva, Valeriia
Hello, folks!

I was wondering if there are some good articles on how to use EC2 Spot 
instances with Flink?

I would appreciate your help! Thank you!

Kind Regards,
Valeriia


Flink OOM issue

2022-03-22 Thread Phoebe Kwok
Hello,

We encountered OOM in the pipeline sink step on a consistent basis. How can
we debug this?

Below is the error:

java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be shutdown...

Thanks!


Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread Binil Benjamin
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:

Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
...

As you can see, the priorityQueue has some negative values (there was a bug
in our code at some point that added these negative values). Could this be
the root cause of why the timer is not getting triggered?

Thanks!

On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin  wrote:

> Hi,
>
> Parallelism is currently set to 9 and it appears to be occurring for all
> subtasks.
>
> We did put logs to see the various timestamps. The following logs are from
> the last 5 days.
>
> - logs from processElement() - logged immediately after timer registration:
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
> step duration=6, Current processing time=1647229483281, Next trigger
> time=1647229543281,
> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (7/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
> step duration=6, Current processing time=1647229512107, Next trigger
> time=1647229572107,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
> step duration=6, Current processing time=1647229543475, Next trigger
> time=1647229603475,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (8/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229633747, Current
> step duration=6, Current processing time=1647229633747, Next trigger
> time=1647229693746,
> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (9/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236501795, Current
> step duration=6, Current processing time=1647236501795, Next trigger
> time=1647236561795,
> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (3/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236513004, Current
> step duration=6, Current processing time=1647236513004, Next trigger
> time=1647236573004,
> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236561848, Current
> step duration=6, Current processing time=1647236561848, Next trigger
> time=1647236621848,
> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (4/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236591875, Current
> step duration=6, 

Re: scala shell not part of 1.14.4 download

2022-03-22 Thread Georg Heiler
Many thanks.

In the linked discussion it sounds like a move - not a delete. However, no
destination is named. Is there currently any moved version of the scala
shell available elsewhere?

Best,
Georg

Am Mo., 21. März 2022 um 09:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Georg,
>
> You can check out the discussion thread for the motivation [1].
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/pojsrrdckjwow5186nd7hn9y5j9t29ov
>
> On Sun, 20 Mar 2022 at 08:13, Georg Heiler 
> wrote:
>
>> Many thanks.
>> I will try your suggestions in the coming days.
>> Why is support for the scala-shell dropped in 2.12? Other projects i.e.
>> spark also managed to keep a spark-shell (REPL like flink's current
>> scala-shell) working for the 2.12 release.
>>
>> Best,
>> Georg
>>
>> Am Fr., 18. März 2022 um 13:33 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> The Scala Shell only works with Scala 2.11. You will need to use the
>>> Scala 2.11 Flink distribution.
>>>
>>> On 18/03/2022 12:42, Georg Heiler wrote:
>>>
>>> Hi,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
>>> mentions:
>>>
>>> bin/start-scala-shell.sh local
>>>
>>>
>>> a script to start a scala REPL shell.
>>>
>>>
>>> But the download for Flink 
>>> https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
>>>
>>> Does not seem to include this script anymore.
>>>
>>>
>>> Am I missing something?
>>>
>>> How can I still start a scala repl?
>>>
>>> Best,
>>>
>>> Georg
>>>
>>>
>>>


Connection warnings when running Flink statefun

2022-03-22 Thread John Kattukudiyil
Hi team,

I'm trying to run a flink-statefun application (version 3.2.0) on my local
machine. The application is a pipeline consisting of multiple services that
communicate to each other via sending http requests served by aiohttp. I am
using a single job manager and a single task manager. When I run the
application, in the worker logs I see these warnings multiple times:

2022-03-18 17:35:43,315 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(analytics-transformer,
dispatch, 77ce0dcb-347c-4c03-bc32-f7ebb734b930), batchSize=1,
totalSizeInBytes=1323, numberOfStates=0)


org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException:
Disconnected




18:25:27,594 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(web, statefun,
82936819-b3d9-4a24-b4eb-81a189d6306c), batchSize=1, totalSizeInBytes=1434,
numberOfStates=0)


org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer




2022-03-18 18:06:44,848 WARN
org.apache.flink.statefun.flink.core.nettyclient.NettyRequest
[] - Exception caught while trying to deliver a message: (attempt
#0)ToFunctionRequestSummary(address=Address(web, statefun,
f004409f-77be-433c-8ab1-ae5f9dad605c), batchSize=1, totalSizeInBytes=1172,
numberOfStates=0)

java.lang.IllegalStateException: FixedChannelPool was closed




Then after some time I see that the master has crashed due to a request
timeout:


org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error occurred when attempting to invoke function
FunctionType(analytics-transformer, dispatch).

at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: java.lang.IllegalStateException: Failure forwarding a message to
a remote function Address(analytics-transformer, dispatch,
77d07eb3-f499-4265-a456-b0f75d738830)

at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
~[statefun-flink-core.jar:3.2.0]

at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
~[statefun-flink-core.jar:3.2.0]

... 16 more

Caused by:
org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException



Could someone tell me why these warnings are occurring and how to fix them?
I'm assuming it is a load-related issue due to the number of incoming

Python UDF Gauge Metrics not working & error log on Vectorized UDF

2022-03-22 Thread Jesry Pandawa
Hello,

I try to run my flink java job using Python UDF, but i have some
doubt using gauge metrics and vectorized udf on it.

   1. i try to create a simple udf like this example on the docs
   
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/metrics/#gauge.
   i also have set the metric reporter. but still cannot get the gauge metric.
   when i try to use counter, i can see the metrics.
   2. to use vectorized udf, you just need to add this parameter
   *func_type="pandas"* in decorator udf. when i try it using simple scalar
   udf like the example on the documentation and adding that parameter, there
   is an error log on my application even though the job still running fine.

   This is the error message:

   10:35:09,378 ERROR
   
/Users/jpandawa/.pyenv/versions/3.8.5/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
   -
   
/Users/jpandawa/.pyenv/versions/3.8.5/lib/python3.8/site-packages/pyflink/table/utils.py:55:
   FutureWarning: Schema passed to names= option, please pass schema=
   explicitly. Will raise exception in future

 return pa.RecordBatch.from_arrays(arrays, schema) is this expected?


Regards,

Jesry


Re: [External] Re: Potential Bug with Date Serialization for Table Stream

2022-03-22 Thread Tom Thornton
Hi Martijn,

Do you know what could be causing this issue given our Flink version? Is
this possibly a bug with that version?

Thanks,
Tom

On Thu, Mar 17, 2022 at 9:59 AM Tom Thornton  wrote:

> Hi Martijn,
>
> We are using 1.11.6.
>
> Thank you for the help.
>
> On Thu, Mar 17, 2022 at 1:37 AM Martijn Visser 
> wrote:
>
>> Hi Tom,
>>
>> Which version of Flink are you using?
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>>
>> On Wed, 16 Mar 2022 at 23:59, Tom Thornton  wrote:
>>
>>> Per the docs ,
>>> I'm hoping to confirm whether or not an error we are seeing is a bug with
>>> Flink. We have a job that uses a Kafka source to read Avro records. The
>>> kafka source is converted into a StreamTableSource. We are using the
>>> new Blink table planner to execute SQL on the table stream. The output is
>>> then put in a sink back to kafka as Avro records. Whenever a query selects
>>> a column that has an avro logicalType of date, we get this error (link to 
>>> full
>>> stack trace ).
>>>
>>> Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast 
>>> to class java.time.LocalDate (java.sql.Date is in module java.sql of loader 
>>> 'platform'; java.time.LocalDate is in module java.base of loader 
>>> 'bootstrap')
>>> at 
>>> org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:128)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:61)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:755)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
>>> at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>> at 
>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162)
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>>
>>> The avro schema definition for a date field is as follows:
>>>
>>> {
>>> "name": "date",
>>> "type": {
>>> "type": "int",
>>> "logicalType": "date"
>>> },
>>> "doc": "date"
>>> },
>>>
>>> Any query that selects a date column would produce the error (and any query 
>>> without a column with type date will work). Example of a query that causes 
>>> the error:
>>>
>>> select `date` from table1
>>>
>>> As suggested in the docs, I also tried this with parent-first loading and 
>>> got the same error. When we run the same job without the Blink table 
>>> planner, i.e., useOldPlanner(), we do not get this error. Is this a bug 
>>> with Flink? Or is there something we can change in the application code to 
>>> prevent this error? Any help/suggestions would be appreciated.
>>>
>>>


[ANNOUNCE] Call for Presentations is open for Flink Forward San Francisco 2022 in-person!

2022-03-22 Thread Timo Walther

Hi everyone,

We’re very excited to announce our Call for Presentations for Flink 
Forward San Francisco 2022! If you have an inspiring Apache Flink use 
case, real-world application, or best practice, Flink Forward is the 
platform for you to share your experiences.


https://www.flink-forward.org/sf-2022/call-for-presentations

After a couple of years in a virtual format, we're excited to announce 
that this year's event will be held in-person at the Hyatt Regency, 
August 1-3; filled with 2-days training and 1-day of conference (August 
3rd).


We look forward to receiving your submissions on:
- Flink Use Cases
- Flink Operations
- Technology Deep Dives
- Ecosystem
- Community

Why speak at Flink Forward San Francisco 2022?
- Expand your network and raise your profile in the Apache Flink community
- Share your experiences and engage with the audience with Q following 
your session

- Your talk will be promoted to the Flink community
- Commemorate Flink Forward with us with the return of Flink Fest at our 
in-person event!

- Get exclusive Flink Forward speaker swag ;-)

Our program committee, can’t wait to review your talk ideas. Be sure to 
submit your talk by May 2, 11:59 pm PDT!


See you there!

Timo Walther
Program Committee Chair

PS: Regarding Covid-19 regulations, we are following the CDC guidelines 
closely. As we near closer to the event, we will update our policy 
accordingly.


Re:回复:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 Thread Michael Ran






因为  KafkaConnectorOptions  里面没有,所有WITH 参数里面不知道如何加入了








在 2022-03-22 18:22:44,"写虫师"  写道:
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2022年3月22日(星期二) 晚上6:21
>收件人:"user-zh@flink.apache.org"
>主题:Re:flink-sql,对于kafka 的一些额外参数如何配置
>
>
>
>Hi,
>partition.discovery.interval.ms 这个是Flink connector 
>kafka里面加上的,KafkaSourceOptions里面定义的,
>看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。
>
>
>
>
>
>在 2022-03-22 17:10:52,"Michael Ran" dear all :
> 
>目前用flink1.4 table api +kafka 的情况下,有各种警告,比如:
> 
>The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
>known config.
> 
>这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
> 
>有什么建议吗?
>感谢!


Asking about the partition files

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?Best,Quynh Sent from Mail for Windows 


????

2022-03-22 Thread ??????


??????flink-sql??????kafka ??????????????????????

2022-03-22 Thread ??????
----
??: 
   "user-zh"



Re:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 Thread RS
Hi,
partition.discovery.interval.ms 这个是Flink connector 
kafka里面加上的,KafkaSourceOptions里面定义的,
看下你的kafka-client的版本,官方的是 2.4.1,如果版本一样,那只能先忽略了。





在 2022-03-22 17:10:52,"Michael Ran"  写道:
>dear all :
> 目前用flink1.4   table api +kafka 的情况下,有各种警告,比如:
> The configuration 'partition.discovery.interval.ms' was supplied 
> but isn't a known config.
> 这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
> 有什么建议吗?
>感谢!


flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 Thread Michael Ran
dear all :
 目前用flink1.4   table api +kafka 的情况下,有各种警告,比如:
 The configuration 'partition.discovery.interval.ms' was supplied 
but isn't a known config.
 这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的?
 有什么建议吗?
感谢!

Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-22 Thread Michael Ran
可以考虑存储层 局部更新
在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道:
>Cdc join
>
>> 2022年3月21日 14:01,JianWen Huang  写道:
>> 
>> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。
>> 例子:
>> 变化前:
>> A流:
>> name  gender
>> a male
>> b male
>> c female
>> 
>> 纬度表B:
>> nameage
>> a   16
>> b17
>> 
>> 结果:
>> name   gender   age
>> a   male  16
>> b   male 17
>> 
>> 发生变化后:
>> 纬度表B:
>> nameage
>> a   16->17
>> b17->18
>> 
>> 结果:
>> name   gender   age
>> a   male  17
>> b   male  18
>> 
>> 目前我想到一个做法是将维度表做成流然后关联事实表,最后根据更新时间取top1最新sink到存储里。请问大家有别的更好做法吗


Re: how to set kafka sink ssl properties

2022-03-22 Thread HG
Hello Matthias and others

I am trying to configure a Kafka Sink with SSL properties as shown further
below.

But in the logs I see warnings:

2022-03-21 12:30:17,108 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'group.id' was supplied but isn't a known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'partition.discovery.interval.ms' was supplied but isn't a
known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.commit.interval.ms' was supplied but isn't a known
config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.type' was supplied but isn't a known config.
2022-03-21 12:30:17,111 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.location' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.password' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.offset.reset' was supplied but isn't a known config.

It seems that they are bogus.

Regards Hans-Peter

Properties sinkkafkaProps  = new Properties();
sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
sinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);

if (kafkaOutputDisabled.equals("false")) {
KafkaSink kSink = KafkaSink.builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();



Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl :

> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource source = KafkaSource.builder()
>> .setProperties(kafkaProps)
>> .setProperty("ssl.truststore.type", trustStoreType)
>> .setProperty("ssl.truststore.password", trustStorePassword)
>> .setProperty("ssl.truststore.location", trustStoreLocation)
>> .setProperty("security.protocol", securityProtocol)
>> .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>> .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>> .setGroupId(inputGroupId)
>> .setClientIdPrefix(clientId)
>> .setTopics(kafkaInputTopic)
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>> 
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>


Re: 如何得知当前Flink集群的leader

2022-03-22 Thread yidan zhao
额。好吧。我其实是问下有没有UI等简单方法看。看日志的话我还是更偏向看zk信息更快点。

yu'an huang  于2022年3月22日周二 01:13写道:

> 你好,你可以通过日志中" {ip:port}  was granted leadership with …"
> 知道当前leader的地址,通过对比IP地址和端口号就可以确认哪一个进程是active的JM。希望可以帮到你。
>
> > On 18 Mar 2022, at 2:06 PM, yidan zhao  wrote:
> >
> >
> 看报错的时候不知道看哪个JM进程的日志。以及想重启JM的时候,需要确认当前JM重启是否会影响集群(非leader重启不影响,leader重启就会导致任务都fail
> > over)。
> >
> > yu'an huang  于2022年3月18日周五 00:05写道:
> >
> >> 请问什么场景下你需要获得RM等的leader呢?从Flink UI可以查看日志,从日志中也能看到现在的leader,不知道是否满足你的使用场景。
> >>
> >>
> >>> On 17 Mar 2022, at 10:38 AM, yidan zhao  wrote:
> >>>
> >>> 如题,集群resourceManager等角色的leader,以及具体每个任务的leader从UI能否得知。
> >>> 目前从ZK获取是一种方法,但不好看,需要程序解码。
> >>
> >>
>
>