Re: left join flink stream

2020-11-17 Thread tkg_cangkul

Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the 
data from my reference mysql table.

for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:

Hi, Youzha

In general `CoGroup` is for the window based operation. How it could 
satisfy your requirements depends on  your specific scenario. But if 
you want to look at the mysql table as a dimension table. There might 
be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the 
JDBC table as a dimension table) in the table jdbc connector [1] and 
more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` 
function could satisfy your requirements. You could find the example 
in [3].



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html 


Best,
Guowei


On Mon, Nov 16, 2020 at 11:20 PM Youzha > wrote:


Hi i want to do join reference between kafka with mysql table
reference. how can i do this thing with flink stream. does coGroup
function can handle this ? or anyone have java sample code with
this case? i’ve read some article that said if cogroup function
can do left outer join. but i’m still confuse to implement it
because  i just learned  flink stream.


need advice pls.





Re: Re: Flink 1.11 not showing logs

2020-11-17 Thread Yang Wang
Hi Arnaud,

It seems that the TaskExecutor terminated exceptionally. I think you need
to check the logs of
container_e38_1604477334666_0960_01_04 to figure out why it crashed or
shut down.

Best,
Yang

LINZ, Arnaud  于2020年11月16日周一 下午7:11写道:

> Hello,
>
> I'm running Flink 1.10 on a yarn cluster. I have a streaming application,
> that, when under heavy load, fails from time to time with this unique error
> message in the whole yarn log:
>
> (...)
> 2020-11-15 16:18:42,202 WARN
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
> late message for now expired checkpoint attempt 63 from task
> 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at
> container_e38_1604477334666_0960_01_04 @ xxx (dataPort=33099).
> 2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager
>  - Closing TaskExecutor connection
> container_e38_1604477334666_0960_01_04 because: The TaskExecutor is
> shutting down.
> 2020-11-15 16:18:55,087 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (7/15)
> (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
>
>   at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
> at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-15 16:18:55,092 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - Calculating tasks to restart to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> 2020-11-15 16:18:55,101 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - 230 tasks should be restarted to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> (...)
>
> What could be the cause of this failure? Why is there no other error
> message?
>
> I've tried to increase the value of heartbeat.timeout, thinking that maybe
> it was due to a slow responding mapper, but it did not solve the issue.
>
> Best regards,
> Arnaud
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: keystore location on EMR

2020-11-17 Thread Fanbin Bu
trying to put the jks on s3... unfortunately, no luck.
i have properties set up:
'properties.ssl.keystore.location'='s3://application-bucket/kafka.keystore.jks'


got the following error message:
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore
.load(SslEngineBuilder.java:292)
at org.apache.kafka.common.security.ssl.SslEngineBuilder
.createSSLContext(SslEngineBuilder.java:144)
... 22 more
*Caused by: java.nio.file.NoSuchFileException:
s3:/application-bucket/kafka.keystore.jks*
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86
)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(
UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(
FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore
.load(SslEngineBuilder.java:285)
... 23 more

On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu  wrote:

> let me try to put it on s3 and change code like:
> 'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks
>
> Thanks,
> Fanbin
>
> On Tue, Nov 17, 2020 at 6:43 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Try with hdfs folder with Gard coded value inside the code and see what
>> happens.
>>
>> On Tue, 17 Nov 2020 at 18:42, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Can you use hdfs as keystone location ? Are you using oozie to run your
>>> job ?
>>>
>>> On Tue, 17 Nov 2020 at 17:54, Fanbin Bu  wrote:
>>>
 Hi Sri, my code is not github. but here is the skeleton.


 val stmt = s"""
   |create table ${table.name} (${schema}, ${watermark})
   |with(
   |'connector' = 'kafka',
   |'topic' = '${table.topic}',
   |'scan.startup.mode'= '${table.scanStartUpMode}',
   |'properties.zookeeper.connect'='${Globals.ZOOKEEPER_CONNECT}',
   |'properties.bootstrap.servers'='${Globals.BOOTSTRAP_SERVERS}',
   |'properties.ssl.keystore.location'='${Globals.SSL_KEYSTORE_LOCATION}',
   |'properties.ssl.keystore.password'='${Globals.KEYSTORE_PASS}',
   |'properties.ssl.key.password'='${Globals.KEYSTORE_PASS}',
   |'properties.security.protocol'='SSL',
   |'properties.ssl.keystore.type'='JKS',
   |'properties.ssl.truststore.type'='JKS',
   |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
   |'properties.group.id' = '${table.name}_group_id',
   |'format' = 'json',
   |'json.ignore-parse-errors' = 'true'
   |)
 """.stripMargin


 tEnv.executeSql(stmt)


 On Tue, Nov 17, 2020 at 5:40 PM sri hari kali charan Tummala <
 kali.tumm...@gmail.com> wrote:

> Hi Fanbin,
>
> Can you share your Flink code which reads from Kafka using SSL ?
>
> Is your code on GitHub ?
>
> Thanks
> Sri
>
>
> On Tue, 17 Nov 2020 at 17:14, Fanbin Bu 
> wrote:
>
>> Hi,
>>
>> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
>> tried to put keystore.jks location under /usr/lib/flink/... like:
>>
>> export
>> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Notice that this is on EMR master(master) node. Both JM and TMs are
>> on EMR core(slave) nodes.
>>
>> However, I got exception: *Caused by:
>> java.nio.file.NoSuchFileException:
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>> even though the file is there
>>
>> [hadoop@ip-10-200-41-39 flink]$ ll
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> -rw-r--r-- 1 root root 5565 Nov 17 22:24
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Where should the keystore.jks be located?
>>
>> Thanks,
>> Fanbin
>>
>>
>> Here is the full log.
>> 2020-11-17 09:35:49
>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:646)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> at
>> 

Re: Logs of JobExecutionListener

2020-11-17 Thread Flavio Pompermaier
is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
scritto:

> I've also verified that the problem persist also using a modified version
> of the WordCount class.
> If you add the code pasted at the end of this email at the end of its main
> method you can verify that the listener is called if you run the program
> from the IDE, but it's not called if you submit the job using the CLI
> client using the command
>
>- bin/flink run
>
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>
> Maybe this is an expected result but I didn't find any documentation of
> this behaviour (neither in the Javadoc or in the flink web site, where I
> can't find any documentation about JobListener at all).
>
> [Code to add to main()]
> // emit result
> if (params.has("output")) {
>   counts.writeAsCsv(params.get("output"), "\n", " ");
>   // execute program
>   env.registerJobListener(new JobListener() {
>
> @Override
> public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>   System.out.println(" SUBMITTED");
> }
>
> @Override
> public void onJobExecuted(JobExecutionResult arg0, Throwable arg1)
> {
>   System.out.println(" EXECUTED");
> }
>   });
>   env.execute("WordCount Example");
> } else {
>   System.out.println("Printing result to stdout. Use --output to
> specify output path.");
>   counts.print();
> }
>
> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier 
> wrote:
>
>> see inline
>>
>> Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
>> scritto:
>>
>>> Hi Flavio,
>>> thanks for sharing this with the Flink community. Could you answer the
>>> following questions, please:
>>> - What's the code of your Job's main method?
>>>
>>
>> it's actually very simple...the main class creates a batch execution env
>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>> listener to the env and I do some stuff before calling env.execute().
>> The listener is executed correctly but if I use the RestClusterClient to
>> sibmit the jobGraph exyracted from that main contained in a jar, the
>> program is executed as usual but the job listener is not called.
>>
>> - What cluster backend and application do you use to execute the job?
>>>
>>
>> I use a standalone session cluster for the moment
>>
>> - Is there anything suspicious you can find in the logs that might be
>>> related?
>>>
>>
>> no unfortunately..
>>
>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier 
>>> wrote:
>>>
 Actually what I'm experiencing is that the JobListener is executed
 successfully if I run my main class from the IDE, while the job listener is
 not fired at all if I submit the JobGraph of the application to a cluster
 using the RestClusterClient..
 Am I doing something wrong?

 My main class ends with the env.execute() and i do
 env.registerJobListener() when I create the Exceution env
 via ExecutionEnvironment.getExecutionEnvironment().

 Thanks in advance for any help,
 Flavio

 On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hello everybody,
> I'm trying to use the JobListener to track when a job finishes (with
> Flink 1.11.0).
> It works great but I have the problem that logs inside
> the onJobExecuted are not logged anywhere..is it normal?
>
> Best,
> Flavio
>



Re: Upsert UDFs

2020-11-17 Thread Rex Fenley
Hi,

Does this seem like it would help?

Thanks!

On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley  wrote:

> Thanks! We did give that a shot and ran into the bug that I reported here
> https://issues.apache.org/jira/browse/FLINK-20036 .
>
> I'm also seeing this function
>
>   public void emitUpdateWithRetract(ACC accumulator, RetractableCollector 
> out); // OPTIONAL
>
> and it says it's more performant in some cases vs
>
>   public void emitValue(ACC accumulator, Collector out); // OPTIONAL
>
> . I'm having some trouble understanding in which cases it benefits
> performance and if it would help our case. Would using
> `emitUpdateWithRetract` instead of `emitValue` reduce the number of
> retracts we're seeing yet preserve the same end results, where our
> Elasticsearch documents stay up to date?
>
> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu  wrote:
>
>> Hi Rex,
>>
>> There is a similar question asked recently which I think is the same
>> reason [1] called retraction amplification.
>> You can try to turn on the mini-batch optimization to reduce the
>> retraction amplification.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>
>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley  wrote:
>>
>>> Also, just to be clear our ES connector looks like this:
>>>
>>> CREATE TABLE sink_es_groups (
>>> id BIGINT,
>>> //.. a bunch of scalar fields
>>> array_of_ids ARRAY,
>>> PRIMARY KEY (id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'elasticsearch-7',
>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>>> 'index' = '${env:GROUPS_ES_INDEX}',
>>> 'format' = 'json',
>>> 'sink.bulk-flush.max-actions' = '512',
>>> 'sink.bulk-flush.max-size' = '1mb',
>>> 'sink.bulk-flush.interval' = '5000',
>>> 'sink.bulk-flush.backoff.delay' = '1000',
>>> 'sink.bulk-flush.backoff.max-retries' = '4',
>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>>> )
>>>
>>>
>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley  wrote:
>>>
 Hello,

 I'm using the Table API to do a bunch of stateful transformations on
 CDC Debezium rows and then insert final documents into Elasticsearch via
 the ES connector.

 I've noticed that Elasticsearch is constantly deleting and then
 inserting documents as they update. Ideally, there would be no delete
 operation for a row update, only for a delete. I'm using the Elasticsearch
 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
 under the hood, which implies upserts are actually what it's capable of.

 Therefore, I think it's possibly my table plan that's causing row
 upserts to turn into deletes + inserts. My plan is essentially a series of
 Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
 possibly the UDF Aggs following the Joins + GroupBys are causing the
 upserts to split into delete + inserts somehow. If this is correct, is it
 possible to make UDFs that preserve Upserts? Or am I totally off-base with
 my assumptions?

 Thanks!
 --

 Rex Fenley  |  Software Engineer - Mobile and Backend


 Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US
 

>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



execution.runtime-mode=BATCH when reading from Hive

2020-11-17 Thread Dongwon Kim
Hi,

Recently I've been working on a real-time data stream processing pipeline
with DataStream API while preparing for a new service to launch.
Now it's time to develop a back-fill job to produce the same result by
reading data stored on Hive which we use for long-term storage.

Meanwhile, I watched Aljoscha's talk [1] and just wondered if I could reuse
major components of the pipeline written in DataStream API.
The pipeline conceptually looks as follows:
(A) reads input from Kafka
(B) performs AsyncIO to Redis in order to enrich the input data
(C) appends timestamps and emits watermarks before time-based window
(D) keyBy followed by a session window with a custom trigger for early
firing
(E) writes output to Kafka

I have simple (maybe stupid) questions on reusing components of the
pipeline written in DataStream API.
(1) By replacing (A) with a bounded source, can I execute the pipeline with
a new BATCH execution mode without modifying (B)~(E)?
(2) Is there a bounded source for Hive available for DataStream API?

Best,

Dongwon

[1] https://www.youtube.com/watch?v=z9ye4jzp4DQ


Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread 丁浩浩
我设置的全是cdcmysql表关联


> 在 2020年11月18日,下午1:07,hailongwang <18868816...@163.com> 写道:
> 
> 抱歉,描述错了。。
> 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢
> 
> 在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>> 我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>> 你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>> 如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>> 
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>> 在 2020-11-18 10:34:48,"Jark Wu"  写道:
>>> 另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>> 
>>> On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>>> 
 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
 https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
 
 解决办法文中也有提及:
 
 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
 
 execution.checkpointing.interval: 10min   # checkpoint间隔时间
 execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
 失败容忍次数
 restart-strategy: fixed-delay  # 重试策略
 restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
 
 Best,
 Jark
 
 On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
 
> 即使我将not
> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
> 下面是截图,(我上传图片每次都看不了啥情况)
> https://imgchr.com/i/DeqixU
> https://imgchr.com/i/DeqP2T
> 
>> 在 2020年11月16日,上午10:29,Jark Wu  写道:
>> 
>> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>> 
>> Best,
>> Jark
>> 
>> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
>> 
>>> select
>>>   ri.sub_clazz_number,
>>>   prcrs.rounds,
>>>   count(*) as num
>>> from
>>>   subclazz gs
>>> JOIN
>>>   (SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
> GROUP
>>> BY gce.number) AS temp
>>> ON
>>>   temp.number = gs.course_number AND temp.grade>30
>>> JOIN
>>>   right_info ri
>>> ON
>>>   gs.number = ri.sub_clazz_number
>>> join
>>>   wide_subclazz ws
>>> on
>>>   ws.number = ri.sub_clazz_number
>>> join
>>>   course gc
>>> on
>>>   gc.number = ws.course_number and gc.course_category_id in (30,40)
>>> left join
>>>   performance_regular_can_renewal_sign prcrs
>>> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>> and not exists (select 1 from internal_staff gis where gis.user_id =
>>> ri.user_id)
>>> and not exists (select 1 from clazz_extension ce where ws.clazz_number
> =
>>> ce.number
>>>   and ce.extension_type = 3 and ce.isdel = 0
>>>   and ce.extension_value in (1,3,4,7,8,11))
>>> group by ri.sub_clazz_number, prcrs.rounds
>>> Sql代码是这样的。
>>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>> 
 在 2020年11月14日,下午5:53,Jark Wu  写道:
 
 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
 需要明确下,到底是什么节点慢了。
 
 On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
 
> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> 有没有比较好的优化方案能缓解这样的问题?
>>> 
>>> 
>>> 
> 
> 
> 




回复:Flink cdc 多表关联处理延迟很大

2020-11-17 Thread flink小猪
cdc关联


| |
18579099920
|
|
邮箱:18579099...@163.com
|

签名由 网易邮箱大师 定制

在2020年11月18日 13:07,hailongwang 写道:
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢

在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>在 2020-11-18 10:34:48,"Jark Wu"  写道:
>>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>
>>On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>>
>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>
>>> 解决办法文中也有提及:
>>>
>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>
>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>> 失败容忍次数
>>> restart-strategy: fixed-delay  # 重试策略
>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>>>
 即使我将not
 exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
 下面是截图,(我上传图片每次都看不了啥情况)
 https://imgchr.com/i/DeqixU
 https://imgchr.com/i/DeqP2T

 > 在 2020年11月16日,上午10:29,Jark Wu  写道:
 >
 > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
 >
 > Best,
 > Jark
 >
 > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
 >
 >> select
 >>ri.sub_clazz_number,
 >>prcrs.rounds,
 >>count(*) as num
 >> from
 >>subclazz gs
 >> JOIN
 >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
 >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
 GROUP
 >> BY gce.number) AS temp
 >> ON
 >>temp.number = gs.course_number AND temp.grade>30
 >> JOIN
 >>right_info ri
 >> ON
 >>gs.number = ri.sub_clazz_number
 >> join
 >>wide_subclazz ws
 >> on
 >>ws.number = ri.sub_clazz_number
 >> join
 >>course gc
 >> on
 >>gc.number = ws.course_number and gc.course_category_id in (30,40)
 >> left join
 >>performance_regular_can_renewal_sign prcrs
 >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
 >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
 >> and not exists (select 1 from internal_staff gis where gis.user_id =
 >> ri.user_id)
 >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
 =
 >> ce.number
 >>and ce.extension_type = 3 and ce.isdel = 0
 >>and ce.extension_value in (1,3,4,7,8,11))
 >> group by ri.sub_clazz_number, prcrs.rounds
 >> Sql代码是这样的。
 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
 >>
 >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
 >>>
 >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
 >>> 需要明确下,到底是什么节点慢了。
 >>>
 >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
 >>>
  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
  有没有比较好的优化方案能缓解这样的问题?
 >>
 >>
 >>





Re:Re:Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread hailongwang
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢

在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>在 2020-11-18 10:34:48,"Jark Wu"  写道:
>>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>
>>On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>>
>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>
>>> 解决办法文中也有提及:
>>>
>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>
>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>> 失败容忍次数
>>> restart-strategy: fixed-delay  # 重试策略
>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>>>
 即使我将not
 exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
 下面是截图,(我上传图片每次都看不了啥情况)
 https://imgchr.com/i/DeqixU
 https://imgchr.com/i/DeqP2T

 > 在 2020年11月16日,上午10:29,Jark Wu  写道:
 >
 > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
 >
 > Best,
 > Jark
 >
 > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
 >
 >> select
 >>ri.sub_clazz_number,
 >>prcrs.rounds,
 >>count(*) as num
 >> from
 >>subclazz gs
 >> JOIN
 >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
 >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
 GROUP
 >> BY gce.number) AS temp
 >> ON
 >>temp.number = gs.course_number AND temp.grade>30
 >> JOIN
 >>right_info ri
 >> ON
 >>gs.number = ri.sub_clazz_number
 >> join
 >>wide_subclazz ws
 >> on
 >>ws.number = ri.sub_clazz_number
 >> join
 >>course gc
 >> on
 >>gc.number = ws.course_number and gc.course_category_id in (30,40)
 >> left join
 >>performance_regular_can_renewal_sign prcrs
 >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
 >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
 >> and not exists (select 1 from internal_staff gis where gis.user_id =
 >> ri.user_id)
 >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
 =
 >> ce.number
 >>and ce.extension_type = 3 and ce.isdel = 0
 >>and ce.extension_value in (1,3,4,7,8,11))
 >> group by ri.sub_clazz_number, prcrs.rounds
 >> Sql代码是这样的。
 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
 >>
 >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
 >>>
 >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
 >>> 需要明确下,到底是什么节点慢了。
 >>>
 >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
 >>>
  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
  有没有比较好的优化方案能缓解这样的问题?
 >>
 >>
 >>





Re:Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread hailongwang
我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
在 2020-11-18 10:34:48,"Jark Wu"  写道:
>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>
>On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>
>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>
>> 解决办法文中也有提及:
>>
>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>
>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>> 失败容忍次数
>> restart-strategy: fixed-delay  # 重试策略
>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>
>> Best,
>> Jark
>>
>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>>
>>> 即使我将not
>>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>>> 下面是截图,(我上传图片每次都看不了啥情况)
>>> https://imgchr.com/i/DeqixU
>>> https://imgchr.com/i/DeqP2T
>>>
>>> > 在 2020年11月16日,上午10:29,Jark Wu  写道:
>>> >
>>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
>>> >
>>> >> select
>>> >>ri.sub_clazz_number,
>>> >>prcrs.rounds,
>>> >>count(*) as num
>>> >> from
>>> >>subclazz gs
>>> >> JOIN
>>> >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
>>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>>> GROUP
>>> >> BY gce.number) AS temp
>>> >> ON
>>> >>temp.number = gs.course_number AND temp.grade>30
>>> >> JOIN
>>> >>right_info ri
>>> >> ON
>>> >>gs.number = ri.sub_clazz_number
>>> >> join
>>> >>wide_subclazz ws
>>> >> on
>>> >>ws.number = ri.sub_clazz_number
>>> >> join
>>> >>course gc
>>> >> on
>>> >>gc.number = ws.course_number and gc.course_category_id in (30,40)
>>> >> left join
>>> >>performance_regular_can_renewal_sign prcrs
>>> >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>>> >> ri.user_id)
>>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>>> =
>>> >> ce.number
>>> >>and ce.extension_type = 3 and ce.isdel = 0
>>> >>and ce.extension_value in (1,3,4,7,8,11))
>>> >> group by ri.sub_clazz_number, prcrs.rounds
>>> >> Sql代码是这样的。
>>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>>> >>
>>> >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>>> >>>
>>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>> >>> 需要明确下,到底是什么节点慢了。
>>> >>>
>>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>>> >>>
>>>  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>>  有没有比较好的优化方案能缓解这样的问题?
>>> >>
>>> >>
>>> >>
>>>
>>>
>>>


自定义分区提交策略之合并小文件的问题

2020-11-17 Thread admin
Hi,
我们有这样的需求--流式入库后,可以自动添加分区和合并小文件。
参考了网上的自定义合并小文件的分区提交策略[1],经过测试发现。
这个自动以policy用于filesystem connector时可以正常合并文件,并生成目标文件。

由于自带的metastore policy只能用在hive table上,所以又测试了下使用hive catalog往hive table里写数据,经过测试 
自动添加分区是ok的,但是合并小文件有点问题--没有合并后的目标目标。而且没有任何异常。

很奇怪的是同样的代码在写hdfs就正常,写hive不行,看了源码写hive底层也是依赖的StreamingFileSink,排查了两天没什么头绪,有没有大佬遇到过这个问题,或者有什么排查的思路。

policy 代码如下:
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

@Override
public void commit(Context context) throws Exception {
LOGGER.info("begin to merge files.partition path is {}.", 
context.partitionPath().toUri().toString());
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
context.partitionPath().toUri().getHost());
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();


List files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), 
partitionPath);//这里待合并文件数量可以正常打印


MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", 
schema.toString());//schema也正常输出


Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}


private List listAllFiles(FileSystem fs, Path dir, String prefix) 
throws IOException {
List result = new ArrayList<>();


RemoteIterator dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = (LocatedFileStatus) 
dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}


return result;
}


private MessageType getParquetSchema(List files, Configuration conf) 
throws IOException {
if (files.size() == 0) {
return null;
}


HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), 
conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();


reader.close();
return schema;
}


private Path merge(String partitionPath, MessageType schema, List 
files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + 
System.currentTimeMillis() + ".parquet");

ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();


for (Path file : files) {
ParquetReader reader = ParquetReader.builder(new 
GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while ((data = (Group) reader.read()) != null) {
writer.write(data);
}
reader.close();
}
LOGGER.info("data size is [{}]", writer.getDataSize());//数据大小也正常输出

try {
writer.close();
} catch (Exception e) {
LOGGER.error("flush failed", e);//没有异常
}

if (!fs.exists(mergeDest)) {
LOGGER.warn("Fuck! result file not exist.");
}

for (Path file : files) {
fs.delete(file, false);
}
return mergeDest;
}
}
粗略看了下ParquetWriter的源码,
ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
在最后build时会创建文件。所以说在这一步创建文件就没成功。
也shi过通过FileSystem.create 创建文件,可以创建但是write也不往里面写。

to hdfs代码:
CREATE TABLE test_kafka (
tuid STRING,
device STRING,
active_time BIGINT,
process_time BIGINT,
pkg_cn_name STRING,
pkg_en_name STRING,
os STRING,
appid INT,
dtu STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka',
'properties.bootstrap.servers' = ‘xxx:9092',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '30'
);

CREATE TABLE test_hdfs (
`day` STRING,
`hour` STRING,
tuid STRING,
device 

Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread Jark Wu
估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ

解决办法文中也有提及:

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

Best,
Jark

On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:

> 即使我将not
> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
> 下面是截图,(我上传图片每次都看不了啥情况)
> https://imgchr.com/i/DeqixU
> https://imgchr.com/i/DeqP2T
>
> > 在 2020年11月16日,上午10:29,Jark Wu  写道:
> >
> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
> >
> > Best,
> > Jark
> >
> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
> >
> >> select
> >>ri.sub_clazz_number,
> >>prcrs.rounds,
> >>count(*) as num
> >> from
> >>subclazz gs
> >> JOIN
> >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
> GROUP
> >> BY gce.number) AS temp
> >> ON
> >>temp.number = gs.course_number AND temp.grade>30
> >> JOIN
> >>right_info ri
> >> ON
> >>gs.number = ri.sub_clazz_number
> >> join
> >>wide_subclazz ws
> >> on
> >>ws.number = ri.sub_clazz_number
> >> join
> >>course gc
> >> on
> >>gc.number = ws.course_number and gc.course_category_id in (30,40)
> >> left join
> >>performance_regular_can_renewal_sign prcrs
> >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
> >> and not exists (select 1 from internal_staff gis where gis.user_id =
> >> ri.user_id)
> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
> >> ce.number
> >>and ce.extension_type = 3 and ce.isdel = 0
> >>and ce.extension_value in (1,3,4,7,8,11))
> >> group by ri.sub_clazz_number, prcrs.rounds
> >> Sql代码是这样的。
> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
> >>
> >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
> >>>
> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> >>> 需要明确下,到底是什么节点慢了。
> >>>
> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
> >>>
>  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>  有没有比较好的优化方案能缓解这样的问题?
> >>
> >>
> >>
>
>
>


Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread Jark Wu
另外,join 节点的并发可以再增加一些,提升 join 的处理性能。

On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:

> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>
> 解决办法文中也有提及:
>
> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>
> execution.checkpointing.interval: 10min   # checkpoint间隔时间
> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
> 失败容忍次数
> restart-strategy: fixed-delay  # 重试策略
> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>
>> 即使我将not
>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>> 下面是截图,(我上传图片每次都看不了啥情况)
>> https://imgchr.com/i/DeqixU
>> https://imgchr.com/i/DeqP2T
>>
>> > 在 2020年11月16日,上午10:29,Jark Wu  写道:
>> >
>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>> >
>> > Best,
>> > Jark
>> >
>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
>> >
>> >> select
>> >>ri.sub_clazz_number,
>> >>prcrs.rounds,
>> >>count(*) as num
>> >> from
>> >>subclazz gs
>> >> JOIN
>> >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>> GROUP
>> >> BY gce.number) AS temp
>> >> ON
>> >>temp.number = gs.course_number AND temp.grade>30
>> >> JOIN
>> >>right_info ri
>> >> ON
>> >>gs.number = ri.sub_clazz_number
>> >> join
>> >>wide_subclazz ws
>> >> on
>> >>ws.number = ri.sub_clazz_number
>> >> join
>> >>course gc
>> >> on
>> >>gc.number = ws.course_number and gc.course_category_id in (30,40)
>> >> left join
>> >>performance_regular_can_renewal_sign prcrs
>> >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>> >> ri.user_id)
>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>> =
>> >> ce.number
>> >>and ce.extension_type = 3 and ce.isdel = 0
>> >>and ce.extension_value in (1,3,4,7,8,11))
>> >> group by ri.sub_clazz_number, prcrs.rounds
>> >> Sql代码是这样的。
>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>> >>
>> >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>> >>>
>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>> >>> 需要明确下,到底是什么节点慢了。
>> >>>
>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>> >>>
>>  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>>  有没有比较好的优化方案能缓解这样的问题?
>> >>
>> >>
>> >>
>>
>>
>>


Re: Flink cdc 多表关联处理延迟很大

2020-11-17 Thread 丁浩浩
即使我将not 
exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
下面是截图,(我上传图片每次都看不了啥情况)
https://imgchr.com/i/DeqixU
https://imgchr.com/i/DeqP2T

> 在 2020年11月16日,上午10:29,Jark Wu  写道:
> 
> 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
> 
> Best,
> Jark
> 
> On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
> 
>> select
>>ri.sub_clazz_number,
>>prcrs.rounds,
>>count(*) as num
>> from
>>subclazz gs
>> JOIN
>>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP
>> BY gce.number) AS temp
>> ON
>>temp.number = gs.course_number AND temp.grade>30
>> JOIN
>>right_info ri
>> ON
>>gs.number = ri.sub_clazz_number
>> join
>>wide_subclazz ws
>> on
>>ws.number = ri.sub_clazz_number
>> join
>>course gc
>> on
>>gc.number = ws.course_number and gc.course_category_id in (30,40)
>> left join
>>performance_regular_can_renewal_sign prcrs
>> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> and not exists (select 1 from internal_staff gis where gis.user_id =
>> ri.user_id)
>> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
>> ce.number
>>and ce.extension_type = 3 and ce.isdel = 0
>>and ce.extension_value in (1,3,4,7,8,11))
>> group by ri.sub_clazz_number, prcrs.rounds
>> Sql代码是这样的。
>> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>> 
>>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>>> 
>>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>>> 需要明确下,到底是什么节点慢了。
>>> 
>>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>>> 
 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
 有没有比较好的优化方案能缓解这样的问题?
>> 
>> 
>> 




Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread lingchanhu
感谢,已经解决了!

BR,
lingchanhu



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink web ui ????????????

2020-11-17 Thread ????
??flink dashboardcancel job??

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread Jark Wu
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。
然后就可以在这个上面调用 registerFunction 了。

On Wed, 18 Nov 2020 at 10:40, lingchanhu  wrote:

> 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread lingchanhu
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread lingchanhu
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log () with
('connect.type'='jdbc'),");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics () with
('connect.type'='filesystem','format.type'='csv',.)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
   
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
.select($("from_user"),$("first_send_msg_today"))
.executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction {

public void accumulate(CountDTO acc, LocalDateTime createTime) {
if (acc.getDateTime() == null) {
acc.setDateTime(createTime);
} else if (acc.getDateTime().isAfter(createTime)) {
acc.setDateTime(createTime);
}
}

@Override
public LocalDateTime getValue(CountDTO acc) {
return acc.getDateTime();
}

@Override
public CountDTO createAccumulator() {
return new CountDTO();
}
}

// accumulate pojo 类
public class CountDTO implements Serializable {

private Integer count;

private LocalDateTime dateTime;

public Integer getCount() {
return count;
}

public void setCount(Integer count) {

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread Jark Wu
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。

On Wed, 18 Nov 2020 at 10:34, Jark Wu  wrote:

> 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
> AggregateFunction。
> 你说 StreamTableEnvironment 可以,我估计你用的是
> StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
>
> Best,
> Jark
>
>
> On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:
>
>> *flink1.11*
>> 在TableEnvironment环境中注册并使用自定义的Aggregate
>> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
>> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>>
>> org.apache.flink.table.api.TableException: Aggregate functions are not
>> updated to the new type system yet.
>> at
>>
>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>> at
>> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>> at
>>
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>>
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>> at java.util.function.Function.lambda$andThen$1(Function.java:88)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>> at
>>
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>> at
>>
>> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>>
>> *// 以下是代码*
>> // main
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inBatchMode()
>> .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>>
>> // 注册source table, jdbc table source
>> tEnv.executeSql("CREATE TABLE wx_event_log () with
>> ('connect.type'='jdbc'),");
>>
>> // 注册sink table,csv table sink
>> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
>> ('connect.type'='filesystem','format.type'='csv',.)");
>>
>> // 注册agg function
>> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
>> FirstSendMsgFunc());
>>
>> Table table2 = tEnv.sqlQuery("select from_user,create_time from
>> wx_event_log
>> where msg_type='text' and create_time between '2020-03-20' and
>> '2020-03-21'");
>>
>> table2.groupBy($("from_user"))
>>
>>
>> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
>> .select($("from_user"),$("first_send_msg_today"))
>> .executeInsert("wx_data_statistics");
>>
>>
>> // 自定义agg function类
>> public class FirstSendMsgFunc extends
>> 

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread Jark Wu
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。
你说 StreamTableEnvironment 可以,我估计你用的是
StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。

Best,
Jark


On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:

> *flink1.11*
> 在TableEnvironment环境中注册并使用自定义的Aggregate
> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
> at
>
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
> at
>
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
> at
>
> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>
> *// 以下是代码*
> // main
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>
> // 注册source table, jdbc table source
> tEnv.executeSql("CREATE TABLE wx_event_log () with
> ('connect.type'='jdbc'),");
>
> // 注册sink table,csv table sink
> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
> ('connect.type'='filesystem','format.type'='csv',.)");
>
> // 注册agg function
> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
> FirstSendMsgFunc());
>
> Table table2 = tEnv.sqlQuery("select from_user,create_time from
> wx_event_log
> where msg_type='text' and create_time between '2020-03-20' and
> '2020-03-21'");
>
> table2.groupBy($("from_user"))
>
>
> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
> .select($("from_user"),$("first_send_msg_today"))
> .executeInsert("wx_data_statistics");
>
>
> // 自定义agg function类
> public class FirstSendMsgFunc extends
> AggregateFunction {
>
> public void accumulate(CountDTO acc, LocalDateTime createTime) {
> if (acc.getDateTime() == null) {
> acc.setDateTime(createTime);
> } else if (acc.getDateTime().isAfter(createTime)) {
> 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

Those 2 approaches all work in my local machine, this is my code:

Scala UDF:
package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
  * The scala UDF.
  */
class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this 
typehint
  @DataTypeHint("ROW")
  def eval(): Row = {

Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }

  // If the udf would be registered by the method 'register_java_function', you 
need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
// The type of the return values should be TypeInformation
Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
Types.STRING()))
  }
}
Python code:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///Users/zhongwei/the-dummy-udf.jar")

# register the udf via 
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE 
SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")

# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
st_env.execute_sql("""create table mySink (
output_of_my_scala_udf ROW
) with (
'connector' = 'print'
)""")

# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()

Best,
Wei

> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> True, I'm using the method you mention, but glad to change. 
> I tried your suggestion instead, but got a similar error.
> 
> Thanks for your support. That is much more tedious than I thought.
> 
> Option 1 - SQL UDF
> 
> SQL UDF
> create_func_ddl = """
> CREATE FUNCTION dummyMap 
>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
> """
>   
> t_env.execute_sql(create_func_ddl)
> 
> Error
> Py4JJavaError: An error occurred while calling o672.execute.
> : org.apache.flink.table.api.TableException: Result field does not match 
> requested type. Requested: Row(s: String, t: String); Actual: 
> GenericType
> 
> Option 2 - Overriding getResultType
> 
> Back to the old registering method, but overriding getResultType:
> 
> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
> 
> Scala UDF
> class dummyMap() extends ScalarFunction {
> 
>   def eval(): Row = {
> 
>   Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> 
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
> = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> }
> 
> Error (on compilation)
> 
> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>  
> [error]   ()org.apache.flink.table.types.DataType 
> [error]   (x$1: 
> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
> [error]  cannot be applied to (org.apache.flink.table.types.DataType, 
> org.apache.flink.table.types.DataType)
> [error]   override def getResultType(signature: Array[Class[_]]): 
> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
> [error]   
>^
> [error] one error found
> [error] (Compile / compileIncremental) Compilation failed
> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
> 
> Le mar. 17 nov. 2020 à 14:01, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> I guess your UDF is registered by the method 'register_java_function' which 
> uses the old type system. In this situation you need to override the 
> 'getResultType' method instead of adding type hint. 
> 
> You can also try to register your UDF via the "CREATE FUNCTION" sql 
> statement, which accepts the type hint.
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,19:29,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> Thanks for your suggestion. Same error.
>> 
>> Scala UDF
>> 
>> @FunctionHint(output = new DataTypeHint("ROW"))
>> class dummyMap() extends ScalarFunction {
>>   def eval(): Row = {
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>   }
>> }
>> 
>> Best regards,
>> 
>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong 

Re: "stepless" sliding windows?

2020-11-17 Thread Alex Cruise
On Thu, Oct 22, 2020 at 11:08 AM Jacob Sevart  wrote:

> I think the issue is you have to specify a *time *interval for "step." It
> would be nice to consider the preceding N minutes as of every message. You
> can somewhat approximate that using a very small step.
>

Indeed, I want the window to slide continuously, not based on a time
interval. I think with the code I posted earlier I'd be creating too many
windows, and double-counting events. I might need to go with global +
evictor, since I want to age out each event.

-0xe1a


> On Thu, Oct 22, 2020 at 2:29 AM Danny Chan  wrote:
>
>> The SLIDING window always triggers as of each step, what do you mean by
>> "stepless" ?
>>
>> Alex Cruise  于2020年10月21日周三 上午1:52写道:
>>
>>> whoops.. as usual, posting led me to find some answers myself. Does this
>>> make sense given my requirements?
>>>
>>> Thanks!
>>>
>>> private class MyWindowAssigner(val windowSize: Time) : 
>>> WindowAssigner() {
>>> private val trigger = CountTrigger.of(1) as Trigger>> TimeWindow>
>>>
>>> override fun assignWindows(
>>> element: Record,
>>> timestamp: Long,
>>> context: WindowAssignerContext
>>> ): MutableCollection {
>>> return mutableListOf(TimeWindow(timestamp - 
>>> windowSize.toMilliseconds(), timestamp))
>>> }
>>>
>>> override fun getDefaultTrigger(env: StreamExecutionEnvironment?): 
>>> Trigger {
>>> return trigger
>>> }
>>>
>>> override fun getWindowSerializer(executionConfig: ExecutionConfig?): 
>>> TypeSerializer {
>>> return TimeWindow.Serializer()
>>> }
>>>
>>> override fun isEventTime(): Boolean {
>>> return true
>>> }
>>> }
>>>
>>>
>>> On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise  wrote:
>>>
 Hey folks!

 I have an application that wants to use "stepless" sliding windows,
 i.e. we produce aggregates on every event. The windows need to be of a
 fixed size, but to have their start and end times update continuously, and
 I'd like to trigger on every event. Is this a bad idea? I've googled and
 read the docs extensively and haven't been able to identify built-in
 functionality or examples that map cleanly to my requirements.

 OK, I just found DeltaTrigger, which looks promising... Does it make
 sense to write a WindowAssigner that makes a new Window on every event,
 allocation rates aside?

 Thanks!

 -0xe1a

>>>
>
> --
> Jacob Sevart
> Software Engineer, Safety
>


flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 Thread lingchanhu
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log () with
('connect.type'='jdbc'),");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics () with
('connect.type'='filesystem','format.type'='csv',.)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
   
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
.select($("from_user"),$("first_send_msg_today"))
.executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction {

public void accumulate(CountDTO acc, LocalDateTime createTime) {
if (acc.getDateTime() == null) {
acc.setDateTime(createTime);
} else if (acc.getDateTime().isAfter(createTime)) {
acc.setDateTime(createTime);
}
}

@Override
public LocalDateTime getValue(CountDTO acc) {
return acc.getDateTime();
}

@Override
public CountDTO createAccumulator() {
return new CountDTO();
}
}

// accumulate pojo 类
public class CountDTO implements Serializable {

private Integer count;

private LocalDateTime dateTime;

public Integer getCount() {
return count;
}

public void setCount(Integer count) {

Re: pyflink1.11 window groupby出错

2020-11-17 Thread anfeng
我是在playgrounds环境跑到,  不过我检查的apache flink 是1.11.2;
跟这个会有关系吗




--
Sent from: http://apache-flink.147419.n8.nabble.com/


keystore location on EMR

2020-11-17 Thread Fanbin Bu
Hi,

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried
to put keystore.jks location under /usr/lib/flink/... like:

export
SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR
core(slave) nodes.

However, I got exception: *Caused by: java.nio.file.NoSuchFileException:
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
even though the file is there

[hadoop@ip-10-200-41-39 flink]$ ll
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Where should the keystore.jks be located?

Thanks,
Fanbin


Here is the full log.
2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:646)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.(SslEngineBuilder.java:104)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
*Caused by: java.nio.file.NoSuchFileException:
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)


Lateral join not finding correlate variable

2020-11-17 Thread Dylan Forciea
This may be due to not understanding  lateral joins in Flink – perhaps you can 
only do so on temporal variables – but I figured I’d ask since the error 
message isn’t intuitive.

I am trying to do a combination of a lateral join and a top N query. Part of my 
ordering is based upon whether the a value in the left side of the query 
matches up. I’m trying to do this in the general form of:

SELECT
  t1.id,
  t1.attr1,
  t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
  SELECT
id,
attr2
  FROM (
SELECT
  id,
  attr2,
  ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
  attr3 DESC,
  t1.attr4 = attr4 DESC
  ) AS row_num
FROM table2
WHERE row_num = 1) t2
ON (t1.id = t2.id)

I am getting an error that looks like:

Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor2 in the plan
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
 at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
 at scala.collection.Iterator.foreach(Iterator.scala:943)
 at scala.collection.Iterator.foreach$(Iterator.scala:943)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
 at scala.collection.IterableLike.foreach(IterableLike.scala:74)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
 at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
 at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
 at 
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
 at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
 at io.oseberg.flink.well.ok.Job.main(Job.scala)

The only other thing I can think of doing is creating a Table Aggregate 
function to pull this off. But, I wanted to check to make sure I wasn’t doing 
something wrong in the above first, or if there is something I’m not thinking 
of doing.

Regards,
Dylan Forciea


Job Manager is taking very long time to finalize the Checkpointing.

2020-11-17 Thread Slim Bouguerra
Originally posed to the dev list
-- Forwarded message -
From: Slim Bouguerra 
Date: Tue, Nov 17, 2020 at 8:09 AM
Subject: Job Manager is taking very long time to finalize the Checkpointing.
To: 


Hi Devs,
I am very new to the Flink code base and working on the evaluation of  the
Checkpointing strategy

In my current setup I am using an NFS based file system as a checkpoint
store. (NAS/NFS has a very high TP over 2GB/s on one node and I am using 12
NFS servers )
When pushing the system to some relatively medium scale aka 120 subtasks
over 6 works with a total state of 100GB.
I observe that the Job manager takes over 2 minutes to finalize the
checkpoint. (observed on the UI and CPU profiling of JM see the flame graph
of 30 second sample)
As you can see by the attached Flames graphs the JM is very busy
serializing the metadata
(>org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.serializeOperatorState
(2,875 samples, 99.65%))
Now the question is why this metadata file is so big in the order of 3GBs
in my case.
How does this size scale ? num_of_tasks * num_states ?

/opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/chk-1
bash-4.2$ ls -all -h
-rw-r--r-- 1 flink flink 3.0G Nov 17 01:42 _metadata

The second question how to better measure the time taken by the JM to
commit the transaction aka time_done_checkpoint - time_got_all_ask_form_tm
Is there a config flag I am missing to make this last step faster ?

My current configs for Checkpoints
state.backend: rocksdb
# See the PV mount path need to be the same as  
state.checkpoints.dir: file:///opt/flink/pv/checkpoints
state.savepoints.dir: file:///opt/flink/pv/savepoints
state.backend.incremental: true
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
execution.checkpointing.interval: 6
execution.checkpointing.mode: AT_LEAST_ONCE
# hitting The rpc invocation size 19598830 exceeds the maximum akka
akka.framesize: 100485760b
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#heartbeat-timeout
heartbeat.timeout: 7
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout
execution.checkpointing.timeout: 15minutes


some metadata about the checkpoint
{"@class":"completed","id":1,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1605315046120,"latest_ack_timestamp":1605315093466,"state_size":12239786229,"end_to_end_duration":47346,"alignment_buffered":0,"num_subtasks":120,"num_acknowledged_subtasks":120,"tasks":{},"external_path":"file:/opt/flink/pv/checkpoints/7474752476036c14d7fdeb4e86af3638/chk-1"}


Re: IllegalStateException Printing Plan

2020-11-17 Thread Rex Fenley
So I tried userDocsTable.explain() however it doesn't give me the AST as
JSON so that I can use the visualizer tool
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html
. Also, if I get rid of executeInsert or move it to after getExecutionPlan
I still end up with "Caused by: java.lang.IllegalStateException: No
operators defined in streaming topology. Cannot execute." Ordering doesn't
seem to make a difference here.

Anything else I can try to get the JSON?

Thanks!

On Tue, Nov 17, 2020 at 1:24 AM Dawid Wysakowicz 
wrote:

> Hi Rex,
>
> The executeInsert method as the name states executes the query. Therefore
> after the method there is nothing in the topology and thus you get the
> exception.
>
> You can either explain the userDocsTable:
>
> userDocsTable.explain()
>
> or you can explain a statement set if you want to postpone the execution:
>
> StatementSet set = tEnv.createStatementSet();
>
> set.addInsert(SINK_ES_PEOPLE, userDocsTable);
>
> set.explain()
>
> or you can explain SQL:
>
> String sqlQuery = ...
>
> tEnv.explainSql(sqlQuery);
>
> Best,
>
> Dawid
> On 17/11/2020 09:16, Khachatryan Roman wrote:
>
> Hello,
>
> Can you share the full program?
> getExecutionPlan call is probably misplaced.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I have the following code attempting to print the execution plan for my
>> job locally. The job runs fine and Flink UI displays so I'd expect this to
>> work.
>>
>> val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
>> println(s"execution plan:\n${this.env.getExecutionPlan()}")
>>
>> but instead I end up with
>>
>> Caused by: java.lang.IllegalStateException: No operators defined in
>> streaming topology. Cannot execute.
>>
>> What am I doing wrong?
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: IllegalStateException Printing Plan

2020-11-17 Thread Rex Fenley
I don't think I can share the full program.
However, the program is a long series of joines and aggs against various
sources and that is the only sink.

Thanks!

On Tue, Nov 17, 2020 at 12:17 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hello,
>
> Can you share the full program?
> getExecutionPlan call is probably misplaced.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I have the following code attempting to print the execution plan for my
>> job locally. The job runs fine and Flink UI displays so I'd expect this to
>> work.
>>
>> val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
>> println(s"execution plan:\n${this.env.getExecutionPlan()}")
>>
>> but instead I end up with
>>
>> Caused by: java.lang.IllegalStateException: No operators defined in
>> streaming topology. Cannot execute.
>>
>> What am I doing wrong?
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Fanbin Bu
all those are verified.

the issue is fixed by adding
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
to org.apache.flink.table.factories.Factory.

Thanks,
Fanbin



On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Please verify that:
> 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
> your-program.jar | grep KafkaDynamicTableFactory")
> 2. kafka-connector version matches the version of Flink distribution on
> EMR.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu  wrote:
>
>> Hi,
>>
>> I could not launch my flink 1.11.2 application on EMR with exception
>>
>> Caused by: org.apache.flink.table.api.ValidationException:
>> Could not find any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>>
>> I attached the full log at the end. After checking some other threads and
>> none applies in my case. here is my observation:
>>
>> 1. dependency check: both flink-connector-kafka and flink-json are
>> included in the final fat jar.
>> 2.
>> resources/META-INF/services/org.apache.flink.table.factories.TableFactory
>> has the following and is included in the final fat jar.
>>   - org.apache.flink.formats.json.JsonRowFormatFactory
>>   -
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>   also noticed that only identifier datagen is shown in the log. No
>> kafka or json in there.
>> 3. local IntelliJ running fine.
>> 4. same jar on EMR not working
>>
>> Please advise.
>> Thanks,
>> Fanbin
>>
>>
>>
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> create a source for reading table
>> 'default_catalog.default_database.analytics_service'.
>>
>> Table options are:
>>
>> 'connector'='kafka'
>> 'format'='json'
>> 'json.ignore-parse-errors'='true'
>> 'properties.bootstrap.servers'='localhost:9093'
>> 'properties.group.id'='xxx'
>> 'properties.security.protocol'='SSL'
>> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
>> 'properties.ssl.key.password'='secret'
>> 'properties.ssl.keystore.location'='xxx.jks'
>> 'properties.ssl.keystore.password'='secret'
>> 'properties.ssl.keystore.type'='JKS'
>> 'properties.ssl.truststore.location'='xxx.jks'
>> 'properties.ssl.truststore.password'='secret'
>> 'properties.ssl.truststore.type'='JKS'
>> 'properties.zookeeper.connect'='localhost:2181'
>> 'scan.startup.mode'='earliest-offset'
>> 'topic'='events'
>> at
>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
>> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
>> at 

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei,

True, I'm using the method you mention, but glad to change.
I tried your suggestion instead, but got a similar error.

Thanks for your support. That is much more tedious than I thought.

*Option 1 - SQL UDF*

*SQL UDF*
create_func_ddl = """
CREATE FUNCTION dummyMap
  AS 'com.dummy.dummyMap' LANGUAGE SCALA
"""

t_env.execute_sql(create_func_ddl)

*Error*
Py4JJavaError: An error occurred while calling o672.execute.
: org.apache.flink.table.api.TableException: Result field does not match
requested type. Requested: Row(s: String, t: String); Actual:
GenericType

*Option 2 *- *Overriding getResultType*

Back to the old registering method, but overriding getResultType:

t_env.register_java_function("dummyMap","com.dummy.dummyMap")

*Scala UDF*
class dummyMap() extends ScalarFunction {

  def eval(): Row = {

  Row.of(java.lang.String.valueOf("foo"),
java.lang.String.valueOf("bar"))

  }

  override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
}

*Error (on compilation)*

[error] dummyMap.scala:66:90: overloaded method value ROW with alternatives:

[error]   (x$1:
org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType


[error]   ()org.apache.flink.table.types.DataType 

[error]   (x$1:
org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType

[error]  cannot be applied to (org.apache.flink.table.types.DataType,
org.apache.flink.table.types.DataType)

[error]   override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)

[error]
  ^

[error] one error found

[error] (Compile / compileIncremental) Compilation failed

[error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01

Le mar. 17 nov. 2020 à 14:01, Wei Zhong  a écrit :

> Hi Pierre,
>
> I guess your UDF is registered by the method 'register_java_function'
> which uses the old type system. In this situation you need to override the
> 'getResultType' method instead of adding type hint.
>
> You can also try to register your UDF via the "CREATE FUNCTION" sql
> statement, which accepts the type hint.
>
> Best,
> Wei
>
> 在 2020年11月17日,19:29,Pierre Oberholzer  写道:
>
> Hi Wei,
>
> Thanks for your suggestion. Same error.
>
> *Scala UDF*
>
> @FunctionHint(output = new DataTypeHint("ROW"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>   }
> }
>
> Best regards,
>
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong  a écrit :
>
>> Hi Pierre,
>>
>> You can try to replace the '@DataTypeHint("ROW")' with
>> '@FunctionHint(output = new DataTypeHint("ROW”))'
>>
>> Best,
>> Wei
>>
>> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
>>
>> Hi Dian, Community,
>>
>> (bringing the thread back to wider audience)
>>
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData,
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>>
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>
>> Thanks in advance for your support !
>>
>> Best regards,
>>
>> *Scala UDF*
>>
>> class dummyMap() extends ScalarFunction {
>>
>>  @DataTypeHint("ROW")
>>  def eval(): Row = {
>>
>> Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>
>>   }
>> }
>>
>> *Table DDL*
>>
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> ...
>> )
>> """
>>
>> *Error*
>>
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query
>> result and registered TableSink
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf:
>> GenericType]
>> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>>
>>
>>
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Thanks Dian, but same error when using explicit returned type:
>>>
>>> class dummyMap() extends ScalarFunction {
>>>
>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>
>>> val states = Map("key1" -> "val1", "key2" -> "val2")
>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>
>>>   }
>>> }
>>>
>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>>>
 You need to explicitly defined the result type the UDF. You could refer
 to [1] for more details if you are using Flink 1.11. If you are using other
 versions of Flink, you need to refer to the corresponding documentation.

 [1]
 

Re: Force Join Unique Key

2020-11-17 Thread Rex Fenley
Ok, what are the performance consequences then of having a join with
NoUniqueKey if the left side's key actually is unique in practice?

Thanks!


On Tue, Nov 17, 2020 at 7:35 AM Jark Wu  wrote:

> Hi Rex,
>
> Currently, the unique key is inferred by the optimizer. However, the
> inference is not perfect.
> There are known issues that the unique key is not derived correctly, e.g.
> FLINK-20036 (is this opened by you?). If you think you have the same case,
> please open an issue.
>
> Query hint is a nice way for this, but it is not supported yet.
> We have an issue to track supporting query hint, see FLINK-17173.
>
> Beest,
> Jark
>
>
> On Tue, 17 Nov 2020 at 15:23, Rex Fenley  wrote:
>
>> Hello,
>>
>> I have quite a few joins in my plan that have
>>
>> leftInputSpec=[NoUniqueKey]
>>
>> in Flink UI. I know this can't truly be the case that there is no unique
>> key, at least for some of these joins that I've evaluated.
>>
>> Is there a way to hint to the join what the unique key is for a table?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-17 Thread Kye Bae
It is possible, but I am not entirely sure about the load order affecting
the metaspace usage.

To find out why your taskmanager container is exceeding the metaspace, we
would need to know what value the max metaspace size is set to and then
find out how much of the metaspace is actually being used and what the
classloading stats are (before Yarn kills the container).

If possible, please share the current metaspace setting and the native
memory stats from one of your taskmanager instances (Java process as a Yarn
container). I think those run as the yarn user as opposed to the hadoop
user (it depends on your environment).

# enable the native memory tracking referenced earlier - this may need to
be passed in as the taskmanager JVM option parameter not the jobmanager.
# log onto one of the taskmanager HW instances
sudo su - yarn # or log on as the user that runs Yarn
jps -v # grab the PID for one of the processes named YarnTaskExecutorRunner
- this would be a taskmanager
jcmd TM_PID VM.native_memory summary # size information for "Class" -
metaspace
jcmd TM_PID VM.classloader_stats # how many classes were loaded by which
classloader, sizes, etc.

There is a gentleman who posts blogs on Flink, and he goes into a bit more
detail there. This was against Flink 1.9, but the foundational concepts
still apply to 1.10.
http://cloudsqale.com/2020/04/29/flink-1-9-off-heap-memory-on-yarn-troubleshooting-container-is-running-beyond-physical-memory-limits-errors/

-K

On Tue, Nov 17, 2020 at 12:47 PM Flavio Pompermaier 
wrote:

> Another big potential candidate is the fact that JDBC libs I use in my job
> are put into the Flink lib folder instead of putting them into the fat
> jar..tomorrow I'll try to see if the metaspace is getting cleared correctly
> after that change.
> Unfortunately our jobs were written before the child-first / parent-first
> classloading refactoring and at that time that was the way to go..but now
> it can cause this kind of problems if using child-first policy.
>
> On Mon, Nov 16, 2020 at 8:44 PM Flavio Pompermaier 
> wrote:
>
>> Thank you Kye for your insights...in my mind, if the job runs without
>> problems one or more times the heap size, and thus the medatadata-size, is
>> big enough and I should not increase it (on the same data of course).
>> So I'll try to understand who is leaking what..the advice to avoid the
>> dynamic class loading is just a workaround to me..there's something wrong
>> going on and tomorrow I'll try to understand the root cause of the
>> problem using -XX:NativeMemoryTracking=summary as you suggested.
>>
>> I'll keep you up to date with my findings..
>>
>> Best,
>> Flavio
>>
>> On Mon, Nov 16, 2020 at 8:22 PM Kye Bae  wrote:
>>
>>> Hello!
>>>
>>> The JVM metaspace is where all the classes (not class instances or
>>> objects) get loaded. jmap -histo is going to show you the heap space usage
>>> info not the metaspace.
>>>
>>> You could inspect what is happening in the metaspace by using jcmd (e.g.,
>>> jcmd JPID VM.native_memory summary) after restarting the cluster with "
>>> *-XX:NativeMemoryTracking=summary"*
>>>
>>> *As the error message suggests, you may need to increase 
>>> *taskmanager.memory.jvm-metaspace.size,
>>> but you need to be slightly careful when specifying the memory parameters
>>> in flink-conf.yaml in Flink 1.10 (they have an issue with a confusing error
>>> message).
>>>
>>> Anothing thing to keep in mind is that you may want to avoid using
>>> dynamic classloading (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#avoiding-dynamic-classloading-for-user-code
>>> ):
>>> when the job continuously fails for some temporary issues, it will load the
>>> same class files into the metaspace multiple times and could exceed
>>> whatever the limit you set it.
>>>
>>> -K
>>>
>>> On Mon, Nov 16, 2020 at 12:39 PM Jan Lukavský  wrote:
>>>
 The exclusions should not have any impact on that, because what defines
 which classloader will load which class is not the presence or particular
 class in a specific jar, but the configuration of parent-first-patterns 
 [1].

 If you don't use any flink internal imports, than it still might be the
 case, that a class in any of the packages defined by the
 parent-first-pattern to hold reference to your user-code classes, which
 would cause the leak. I'd recommend to inspect the heap dump after several
 restarts of the application and look for reference to Class objects from
 the root set.

 Jan

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading
 

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-17 Thread Flavio Pompermaier
Another big potential candidate is the fact that JDBC libs I use in my job
are put into the Flink lib folder instead of putting them into the fat
jar..tomorrow I'll try to see if the metaspace is getting cleared correctly
after that change.
Unfortunately our jobs were written before the child-first / parent-first
classloading refactoring and at that time that was the way to go..but now
it can cause this kind of problems if using child-first policy.

On Mon, Nov 16, 2020 at 8:44 PM Flavio Pompermaier 
wrote:

> Thank you Kye for your insights...in my mind, if the job runs without
> problems one or more times the heap size, and thus the medatadata-size, is
> big enough and I should not increase it (on the same data of course).
> So I'll try to understand who is leaking what..the advice to avoid the
> dynamic class loading is just a workaround to me..there's something wrong
> going on and tomorrow I'll try to understand the root cause of the
> problem using -XX:NativeMemoryTracking=summary as you suggested.
>
> I'll keep you up to date with my findings..
>
> Best,
> Flavio
>
> On Mon, Nov 16, 2020 at 8:22 PM Kye Bae  wrote:
>
>> Hello!
>>
>> The JVM metaspace is where all the classes (not class instances or
>> objects) get loaded. jmap -histo is going to show you the heap space usage
>> info not the metaspace.
>>
>> You could inspect what is happening in the metaspace by using jcmd (e.g.,
>> jcmd JPID VM.native_memory summary) after restarting the cluster with "
>> *-XX:NativeMemoryTracking=summary"*
>>
>> *As the error message suggests, you may need to increase 
>> *taskmanager.memory.jvm-metaspace.size,
>> but you need to be slightly careful when specifying the memory parameters
>> in flink-conf.yaml in Flink 1.10 (they have an issue with a confusing error
>> message).
>>
>> Anothing thing to keep in mind is that you may want to avoid using
>> dynamic classloading (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#avoiding-dynamic-classloading-for-user-code):
>> when the job continuously fails for some temporary issues, it will load the
>> same class files into the metaspace multiple times and could exceed
>> whatever the limit you set it.
>>
>> -K
>>
>> On Mon, Nov 16, 2020 at 12:39 PM Jan Lukavský  wrote:
>>
>>> The exclusions should not have any impact on that, because what defines
>>> which classloader will load which class is not the presence or particular
>>> class in a specific jar, but the configuration of parent-first-patterns [1].
>>>
>>> If you don't use any flink internal imports, than it still might be the
>>> case, that a class in any of the packages defined by the
>>> parent-first-pattern to hold reference to your user-code classes, which
>>> would cause the leak. I'd recommend to inspect the heap dump after several
>>> restarts of the application and look for reference to Class objects from
>>> the root set.
>>>
>>> Jan
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading
>>> 
>>> On 11/16/20 5:34 PM, Flavio Pompermaier wrote:
>>>
>>> I've tried to remove all possible imports of classes not contained in
>>> the fat jar but I still face the same problem.
>>> I've also tried to reduce as much as possible the exclude in the shade
>>> section of the maven plugin (I took the one at [1]) so now I exclude only
>>> few dependencies..could it be that I should include org.slf4j:* if I use
>>> static import of it?
>>>
>>> 
>>> 
>>>   com.google.code.findbugs:jsr305
>>>   org.slf4j:*
>>>   log4j:*
>>> 
>>> 
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies
>>> 
>>>
>>> On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský  wrote:
>>>
 Yes, that could definitely cause this. You should probably avoid using
 these flink-internal shaded classes and ship your own versions (not 
 shaded).

 Best,

  Jan
 On 11/16/20 3:22 PM, Flavio Pompermaier wrote:

 Thank you Jan for your valuable feedback.
 Could it be that I should not use import shaded-jackson classes in my
 user code?
 For example import
 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?

 Bets,
 Flavio

 On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský  wrote:

> Hi Flavio,
>
> when I encountered quite similar problem that you describe, it was
> related to a static 

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-17 Thread Simone Cavallarin
Hi,

I have been working on the suggestion that you gave me, thanks! The first part 
is to add to the message the gap. 1)I receive the event, 2)I take that event 
and I map it using  StatefulsessionCalculator, that is where I put together 
"The message", and "long" that is my gap in millis.

DataStream source = 

Operation in front of the window that keeps track of session gaps

DataStream> enriched = source
   .keyBy()
   .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2 map(MyMessageType input) {
   ValueState valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
   state.update(input);
   long suggestedGap = state.getSuggestedGap();
   valueState.update(state);
   return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
   .keyBy(new MyKeySelector())
   .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?


One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?

This is my code,

[cid:3274a479-171d-400d-a710-0b233ff2af46]


Before the POJO was working fine using "stream" but now that I'm going through 
a Tuple2 i have some issues.

[cid:da211012-094d-46c8-89e7-2d619c2ffb83]


The last point when you said: "I think, however, that it might be easier at 
this point to just use a stateful ProcessFunction", you meant a completely 
different approach, would be better?

many thanks for the help.

s


From: Aljoscha Krettek 
Sent: 16 November 2020 16:22
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Hi,

thanks for the pointer, I should have remembered that thread earlier!

I'll try and sketch what the pipeline might look like to show what I
mean by "enriching the message" and where the operations would sit.

DataStream source = 

DataStream> enriched = source
   .keyBy()
   .map(new StatefulSessionCalculator()); // or process()

DataStream<...> result = enriched
   .keyBy(new MyKeySelector())
   .window(EventTimeSessionWindows.withDynamicGap(
 new DynamicWindowGapExtractor()))
   .sideOutputLateData(lateDataSideOutputTag)
   .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10)))
   .process(new ProcessWindowFunction(...));

The stateful map function could look something like this:

Tuple2 map(MyMessageType input) {
   ValueState valueState = getState(myModelStateDescriptor);
   MyState state = valueState.value()
   state.update(input);
   long suggestedGap = state.getSuggestedGap();
   valueState.update(state);
   return Tuple2.of(input, suggestedGap);
}

The two operations have to be separate because the session gap extractor
cannot be stateful.

I think, however, that it might be easier at this point to just use a
stateful ProcessFunction to not have to deal with the somewhat finicky
setup of the stateful extractor just to force it into the requirements
of the session windows API.

Best,
Aljoscha

On 14.11.20 10:50, Simone Cavallarin wrote:
> Hi Aljoscha,
>
> I found a similar question of mine by 
> KristoffSC
>  Jan, 2020, called Session Windows with dynamic gap.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-Window-with-dynamic-gap-td31893.html
>
> The idea is the same and at the end of the thread this was the solution that 
> you suggested: "There are no plans of adding state support to the gap 
> extractors but you could do this using a two-step approach, i.e. have an 
> operation in front of the window that keeps track of session gaps, enriches 
> the message with the gap that should be used and then the extractor extracts 
> that gap. This is a more modular approach compared to putting everything in 
> one operator/extractor."
>
>
> 1) Operation in front of the windows -> keep track of the session gaps (I 
> have been reading all around for this)
>
>*   
> (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.html)
>*   
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
>*   
> https://www.codota.com/code/java/classes/org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor
>
>
> 2) Enrich the message with the gap that should be use (this is a parameter 
> can be for example an 

Re: 请教这种数据格式怎么配置event time呢

2020-11-17 Thread Jark Wu
报什么错?


On Tue, 17 Nov 2020 at 23:43, 赵一旦  wrote:

> CREATE TABLE user_log
> (
> d MAP,
> process_time AS PROCTIME(),
> event_time AS
> TO_TIMESTAMP(FROM_UNIXTIME(COALESCE(d['server_time'], 0) / 1000)),
> WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> ) WITH (
> 'connector' = 'kafka',
>
>  ...
>
> );
>
> 如上,报错。貌似不支持这么玩。但是我的数据格式就是这样的,比如:
>
> {
>
>   "d": {
>
> "name": "abc",
>
> "age": 12
>
>   }
>
> }
>


Re: Kafka SQL table Re-partition via Flink SQL

2020-11-17 Thread Jark Wu
Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and
set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka
producer will partition data by the message key by default). You can do
this by adding some additional options in 1.12.

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to
message key
 'key.format' = 'raw',
 'value.format' = 'json'
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> I'm pulling in some Flink SQL experts (in CC) to help you with this one :)
>
> Cheers,
> Gordon
>
> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra 
> wrote:
>
>> Hi,
>> I am trying to author a SQL job that does repartitioning a Kafka SQL
>> table into another Kafka SQL table.
>> as example input/output tables have exactly the same SQL schema (see
>> below) and data the only difference is that the new kafka stream need to be
>> repartition using a simple project like item_id (input stream is
>> partitioned by user_id)
>> is there a way to do this via SQL only ? without using
>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>>
>> In other words how can we express the stream key (keyedBy) via the SQL
>> layer ?
>>
>> For instance in Hive they expose a system column called  __key or
>> __partition that can be used to do this via SQL layer  (see
>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
>> )
>>
>> CREATE TABLE input_kafkaTable (
>>  user_id BIGINT,
>>  item_id BIGINT,
>>  category_id BIGINT,
>>  behavior STRING,
>>  ts TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'user_behavior_partition_by_uid',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>> )
>>
>> CREATE TABLE output_kafkaTable (
>>  user_id BIGINT,
>>  item_id BIGINT,
>>  category_id BIGINT,
>>  behavior STRING,
>>  ts TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'user_behavior_partition_by_iid',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>> )
>>
>>
>>
>> --
>>
>> B-Slim
>> ___/\/\/\___/\/\/\___/\/\/\___/\/\/\___/\/\/\___
>>
>


请教这种数据格式怎么配置event time呢

2020-11-17 Thread 赵一旦
CREATE TABLE user_log
(
d MAP,
process_time AS PROCTIME(),
event_time AS
TO_TIMESTAMP(FROM_UNIXTIME(COALESCE(d['server_time'], 0) / 1000)),
WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH (
'connector' = 'kafka',

 ...

);

如上,报错。貌似不支持这么玩。但是我的数据格式就是这样的,比如:

{

  "d": {

"name": "abc",

"age": 12

  }

}


Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Dongwon Kim
Hi Aljoscha,

Thanks for the input.
The '-t' option seems to be available as of flink-1.11 while the latest
FlinkRunner is based on flink-1.10.
So I use '-e' option which is available in 1.10:

$ flink run -e yarn-per-job -d <...>


A short question here is that this command ignores *-yD* and *--yarnship*
options.
Are these options only for yarn session mode?

Best,

Dongwon




On Tue, Nov 17, 2020 at 5:16 PM Aljoscha Krettek 
wrote:

> Hi,
>
> to ensure that we really are using per-job mode, could you try and use
>
> $ flink run -t yarn-per-job -d <...>
>
> This will directly specify that we want to use the YARN per-job
> executor, which bypasses some of the logic in the older YARN code paths
> that differentiate between YARN session mode and YARN per-job mode.
>
> Best,
> Aljoscha
>
> On 17.11.20 07:02, Tzu-Li (Gordon) Tai wrote:
> > Hi,
> >
> > Not sure if this question would be more suitable for the Apache Beam
> > mailing lists, but I'm pulling in Aljoscha (CC'ed) who would know more
> > about Beam and whether or not this is an expected behaviour.
> >
> > Cheers,
> > Gordon
> >
> > On Mon, Nov 16, 2020 at 10:35 PM Dongwon Kim 
> wrote:
> >
> >> Hi,
> >>
> >> I'm trying to run a per-job cluster for a Beam pipeline w/ FlinkRunner
> on
> >> YARN as follows:
> >>
> >>> flink run -m yarn-cluster -d \
> >>
> >>  my-beam-pipeline.jar \
> >>>  --runner=FlinkRunner \
> >>>  --flinkMaster=[auto] \
> >>>  --parallelism=8
> >>
> >>
> >> Instead of creating a per-job cluster as wished, the above command seems
> >> to create a session cluster and then submit a job onto the cluster.
> >>
> >> I doubt it because
> >> (1) In the attached file, there's "Submit New Job" which is not shown in
> >> other per-job applications that are written in Flink APIs and submitted
> to
> >> YARN similar to the above command.
> >>
> >> [image: beam on yarn.png]
> >> (2) When the job is finished, the YARN application is still in its
> RUNNING
> >> state without being terminated. I had to kill the YARN application
> manually.
> >>
> >> FYI, I'm using
> >> - Beam v2.24.0 (Flink 1.10)
> >> - Hadoop v3.1.1
> >>
> >> Thanks in advance,
> >>
> >> Best,
> >>
> >> Dongwon
> >>
> >
>
>


Re: Force Join Unique Key

2020-11-17 Thread Jark Wu
Hi Rex,

Currently, the unique key is inferred by the optimizer. However, the
inference is not perfect.
There are known issues that the unique key is not derived correctly, e.g.
FLINK-20036 (is this opened by you?). If you think you have the same case,
please open an issue.

Query hint is a nice way for this, but it is not supported yet.
We have an issue to track supporting query hint, see FLINK-17173.

Beest,
Jark


On Tue, 17 Nov 2020 at 15:23, Rex Fenley  wrote:

> Hello,
>
> I have quite a few joins in my plan that have
>
> leftInputSpec=[NoUniqueKey]
>
> in Flink UI. I know this can't truly be the case that there is no unique
> key, at least for some of these joins that I've evaluated.
>
> Is there a way to hint to the join what the unique key is for a table?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Khachatryan Roman
Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu  wrote:

> Hi,
>
> I could not launch my flink 1.11.2 application on EMR with exception
>
> Caused by: org.apache.flink.table.api.ValidationException:
> Could not find any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
>
> I attached the full log at the end. After checking some other threads and
> none applies in my case. here is my observation:
>
> 1. dependency check: both flink-connector-kafka and flink-json are
> included in the final fat jar.
> 2.
> resources/META-INF/services/org.apache.flink.table.factories.TableFactory
> has the following and is included in the final fat jar.
>   - org.apache.flink.formats.json.JsonRowFormatFactory
>   - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>   also noticed that only identifier datagen is shown in the log. No
> kafka or json in there.
> 3. local IntelliJ running fine.
> 4. same jar on EMR not working
>
> Please advise.
> Thanks,
> Fanbin
>
>
>
>
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.analytics_service'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'json.ignore-parse-errors'='true'
> 'properties.bootstrap.servers'='localhost:9093'
> 'properties.group.id'='xxx'
> 'properties.security.protocol'='SSL'
> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
> 'properties.ssl.key.password'='secret'
> 'properties.ssl.keystore.location'='xxx.jks'
> 'properties.ssl.keystore.password'='secret'
> 'properties.ssl.keystore.type'='JKS'
> 'properties.ssl.truststore.location'='xxx.jks'
> 'properties.ssl.truststore.password'='secret'
> 'properties.ssl.truststore.type'='JKS'
> 'properties.zookeeper.connect'='localhost:2181'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='events'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
> at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
> at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> 

Re: Flink JSON反序列化DECIMAL精度丢失

2020-11-17 Thread Jark Wu
我们在 issue 中继续讨论吧。如在 issue中建议的,建议可以试试新版 connector 改成 'connector' = 'kafka'
试试。

On Mon, 16 Nov 2020 at 18:39, Luna Wong  wrote:

> https://issues.apache.org/jira/browse/FLINK-20170
>
> 这是我今天提的issue。
> Jackson这样反序列化会把BigDECIMAL转成Double。超过12位的小数精度丢失。这种情况我得怎么办。只能先当做STRING类型处理或修改下JSON这个包的源码重新变一下。
> 还有其他的最佳实践吗
>


Re: How to convert Int to Date

2020-11-17 Thread Timo Walther

Hi Rex,

the classes mentioned in the documentation such as `int` and 
`java.lang.Integer` are only used when you leave the SQL world to a UDF 
or to a Java implementation in a sink.


But as a SQL user you only need to pay attention to the logical data 
type. Those must match entirely or be a supported implicit cast.


Regards,
Timo

On 17.11.20 09:51, Khachatryan Roman wrote:

Hello,

Do both of the types you use have the same nullability?
For a primitive int, the documentation you referred to says: "Output 
only if type is not nullable".


Regards,
Roman


On Tue, Nov 17, 2020 at 7:49 AM Rex Fenley > wrote:


Hello,

I'm using the Table API and I have a column which is an integer day
since epoch. According to the docs [1] both `int` and
`java.lang.Integer` are acceptable for DATE. However, if I try to
use the SQL API to write a DATE out to the Elasticsearch connector
for the INT column I receive an exception. How then should I go
about converting to DATE?

Exception:
Caused by: org.apache.flink.table.api.ValidationException: Field
types of query result and registered TableSink
default_catalog.default_database.sink_es_people do not match.
Query schema: [... column: INT, ...]
Sink schema: [... column: DATE, ...]

I know this column is the culprit because when I make it INT on both
ends it works.

How do I go about making my INT a DATE?

Thanks!

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#date-and-time



-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






Re:Re:Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

2020-11-17 Thread hailongwang
1. 如果只用一个 kafka 版本的话,可以不要 flink-sql-connector-kafka-0.10_2.12-1.11.1.jar 这个 
jar包把,就是没有必要 将 kafka client shade 。
目前 1.12 版本也只有一个 kafka connector 版本了。
2. 如果确实需要 shade 的包的话,可以指定 `org.apache.kafka` 开头的包优先从用户的 classload 加载。

在 2020-11-17 16:14:24,"kingdomad"  写道:
>大佬牛逼!
>我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。
>请问下这个问题要如何解决
>
>
>
>
>
>
>
>--
>
>kingdomad
>
>
>
>
>
>
>
>在 2020-11-17 17:08:10,"hailongwang" <18868816...@163.com> 写道:
>>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
>>Shade 后的 kakfa 0.10的版本 的 artifactId 为:
>>flink-sql-connector-kafka-0.10_${scala.binary.version}
>>
>>在 2020-11-17 15:47:08,"kingdomad"  写道:
>>>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>>>在idea调试没报错,提交到yarn集群就报错了。求助。
>>>
>>>
>>>使用的consumer如下:
>>>val logConsumer: FlinkKafkaConsumer010[String] = new 
>>>FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>>>
>>>
>>>pom文件导入的依赖如下:
>>>
>>>org.apache.flink
>>>flink-connector-kafka-0.10_2.12
>>>1.11.1
>>>
>>>
>>>
>>>
>>>报错如下:
>>>
>>>2020-11-17 16:39:37
>>>
>>>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>>
>>>at 
>>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:718)
>>>
>>>at 
>>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>>>
>>>at 
>>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>>>
>>>at 
>>>org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>>>
>>>at 
>>>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>
>>>at 
>>>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>>>
>>>at 
>>>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>
>>>at 
>>>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>
>>>at 
>>>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>>>
>>>at 
>>>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>>>
>>>at 
>>>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>>>
>>>at 
>>>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>>>
>>>at 
>>>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>>>
>>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>
>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>
>>>at java.lang.Thread.run(Thread.java:745)
>>>
>>>Caused by: org.apache.kafka.common.KafkaException: 
>>>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>>>
>>>at 
>>>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>>>
>>>at 
>>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:637)
>>>
>>>... 15 more
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>--
>>>
>>>kingdomad
>>>


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

I guess your UDF is registered by the method 'register_java_function' which 
uses the old type system. In this situation you need to override the 
'getResultType' method instead of adding type hint. 

You can also try to register your UDF via the "CREATE FUNCTION" sql statement, 
which accepts the type hint.

Best,
Wei

> 在 2020年11月17日,19:29,Pierre Oberholzer  写道:
> 
> Hi Wei,
> 
> Thanks for your suggestion. Same error.
> 
> Scala UDF
> 
> @FunctionHint(output = new DataTypeHint("ROW"))
> class dummyMap() extends ScalarFunction {
>   def eval(): Row = {
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>   }
> }
> 
> Best regards,
> 
> Le mar. 17 nov. 2020 à 10:04, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> You can try to replace the '@DataTypeHint("ROW")' with 
> '@FunctionHint(output = new DataTypeHint("ROW”))'
> 
> Best,
> Wei
> 
>> 在 2020年11月17日,15:45,Pierre Oberholzer > > 写道:
>> 
>> Hi Dian, Community,
>> 
>> (bringing the thread back to wider audience)
>> 
>> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
>> also this simple case leads to a type mismatch between UDF and Table API.
>> I've also tried other Map objects from Flink (table.data.MapData, 
>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
>> (java.util.Map) in combination with DataTypeHint, without success.
>> N.B. I'm using version 1.11.
>> 
>> Am I doing something wrong or am I facing limitations in the toolkit ?
>> 
>> Thanks in advance for your support !
>> 
>> Best regards,
>> 
>> Scala UDF
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>  @DataTypeHint("ROW")
>>  def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> }
>> 
>> Table DDL
>> 
>> my_sink_ddl = f"""
>> create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> ...
>> )
>> """
>> 
>> Error
>> 
>> Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.ValidationException: Field types of query 
>> result and registered TableSink 
>> `default_catalog`.`default_database`.`mySink` do not match.
>> Query result schema: [output_of_my_scala_udf: 
>> GenericType]
>> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>> 
>> 
>> 
>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer > > a écrit :
>> Thanks Dian, but same error when using explicit returned type:
>> 
>> class dummyMap() extends ScalarFunction {
>> 
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>> 
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>> 
>>   }
>> }
>> 
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu > > a écrit :
>> You need to explicitly defined the result type the UDF. You could refer to 
>> [1] for more details if you are using Flink 1.11. If you are using other 
>> versions of Flink, you need to refer to the corresponding documentation.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>  
>> 
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer >> > 写道:
>>> 
>>> ScalarFunction
>> 
>> 
>> 
>> -- 
>> Pierre
>> 
>> -- 
>> Pierre
> 
> 
> 
> -- 
> Pierre



Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei,

Thanks for your suggestion. Same error.

*Scala UDF*

@FunctionHint(output = new DataTypeHint("ROW"))
class dummyMap() extends ScalarFunction {
  def eval(): Row = {
Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
  }
}

Best regards,

Le mar. 17 nov. 2020 à 10:04, Wei Zhong  a écrit :

> Hi Pierre,
>
> You can try to replace the '@DataTypeHint("ROW")' with
> '@FunctionHint(output = new DataTypeHint("ROW”))'
>
> Best,
> Wei
>
> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
>
> Hi Dian, Community,
>
> (bringing the thread back to wider audience)
>
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData,
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
>
> Am I doing something wrong or am I facing limitations in the toolkit ?
>
> Thanks in advance for your support !
>
> Best regards,
>
> *Scala UDF*
>
> class dummyMap() extends ScalarFunction {
>
>  @DataTypeHint("ROW")
>  def eval(): Row = {
>
> Row.of(java.lang.String.valueOf("foo"),
> java.lang.String.valueOf("bar"))
>
>   }
> }
>
> *Table DDL*
>
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> ...
> )
> """
>
> *Error*
>
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`mySink` do not match.
> Query result schema: [output_of_my_scala_udf:
> GenericType]
> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
>
>
>
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
> pierre.oberhol...@gmail.com> a écrit :
>
>> Thanks Dian, but same error when using explicit returned type:
>>
>> class dummyMap() extends ScalarFunction {
>>
>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>
>> val states = Map("key1" -> "val1", "key2" -> "val2")
>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>
>>   }
>> }
>>
>> Le ven. 13 nov. 2020 à 10:34, Dian Fu  a écrit :
>>
>>> You need to explicitly defined the result type the UDF. You could refer
>>> to [1] for more details if you are using Flink 1.11. If you are using other
>>> versions of Flink, you need to refer to the corresponding documentation.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>
>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer  写道:
>>>
>>> ScalarFunction
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
>
> --
> Pierre
>
>
>

-- 
Pierre


Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Khachatryan Roman
Hi,

I think Robert is right, state handles are deleted first, and then the
directory is deleted non-recursively.
If any exception occurs while removing the files, it will be combined with
the other exception (as suppressed).
So probably Flink failed to delete some files and then directory removal
failed because of that.
Can you share the full exception to check this?
And probably check what files exist there as Robert suggested.

Regards,
Roman


On Tue, Nov 17, 2020 at 10:38 AM Joshua Fan  wrote:

> Hi Robert,
>
> When the `delete(Path f, boolean recursive)` recursive is false, hdfs
> will throw exception like below:
> [image: checkpoint-exception.png]
>
> Yours sincerely
> Josh
>
> On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger 
> wrote:
>
>> Hey Josh,
>>
>> As far as I understand the code CompletedCheckpoint.discard(), Flink is
>> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
>> deleting the directory.
>>
>> Which files are left over in your case?
>> Do you see any exceptions on the TaskManagers?
>>
>> Best,
>> Robert
>>
>> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan 
>> wrote:
>>
>>> Hi
>>>
>>> When a checkpoint should be deleted,
>>> FsCompletedCheckpointStorageLocation.disposeStorageLocation will be
>>> called.
>>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
>>> action. I wonder why the recursive parameter is set to false? as the
>>> exclusiveCheckpointDir is truly a directory. in our hadoop, this causes
>>> the checkpoint cannot be removed.
>>> It is easy to change the recursive parameter to true, but is there any
>>> potential harm?
>>>
>>> Yours sincerely
>>> Josh
>>>
>>>


Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Kien Truong
Hi Yangze,

Thanks for checking.

I'm not using the new application mode, but the old single job yarn-cluster
mode.

I'll try to get some more logs tomorrow.

Regards,
Kien

On 17 Nov 2020 at 16:37, Yangze Guo  wrote:

Hi,

There is a login operation in
YarnEntrypointUtils.logYarnEnvironmentInformation without the keytab.
One suspect is that Flink may access the HDFS when it tries to build
the PackagedProgram.

Does this issue only happen in the application mode? If so, I would cc
@kkloudas.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 4:52 PM Yangze Guo  wrote:
>
> Hi,
>
> AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the
> HadoopModule when user provides the keytab and principal. I'll try to
> do a deeper investigation to figure out is there any HDFS access
> before the HadoopModule installed.
>
> Best,
> Yangze Guo
>
>
> On Tue, Nov 17, 2020 at 4:36 PM Kien Truong  wrote:
> >
> > Hi,
> >
> > Yes, I did. There're also logs about logging in using keytab successfully 
> > in both Job Manager and Task Manager.
> >
> > I found some YARN docs about token renewal on AM restart
> >
> >
> > > Therefore, to survive AM restart after token expiry, your AM has to get 
> > > the NMs to localize the keytab or make no HDFS accesses until (somehow) a 
> > > new token has been passed to them from a client.
> >
> > Maybe Flink did access HDFS with an expired token, before switching to use 
> > the localized keytab ?
> >
> > Regards,
> > Kien
> >
> >
> >
> > On 17 Nov 2020 at 15:14, Yangze Guo  wrote:
> >
> > Hi, Kien,
> >
> >
> >
> > Do you config the "security.kerberos.login.principal" and the
> >
> > "security.kerberos.login.keytab" together? If you only set the keytab,
> >
> > it will not take effect.
> >
> >
> >
> > Best,
> >
> > Yangze Guo
> >
> >
> >
> > On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
> >
> > >
> >
> > > Hi all,
> >
> > >
> >
> > > We are having an issue where Flink Application Master is unable to 
> > > automatically restart Flink job after its delegation token has expired.
> >
> > >
> >
> > > We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster 
> > > mode. We have also add valid keytab configuration and taskmanagers are 
> > > able to login with keytabs correctly. However, it seems YARN Application 
> > > Master still use delegation tokens instead of the keytab.
> >
> > >
> >
> > > Any idea how to resolve this would be much appreciated.
> >
> > >
> >
> > > Thanks
> >
> > > Kien
> >
> > >
> >
> > >
> >
> > >
> >
> > >
> >


Re: pyflink1.11 window groupby出错

2020-11-17 Thread Xingbo Huang
Hi,

你说的这个在1.11应该已经修复了[1] ,我在本地没有复现出你这个问题

[1] https://issues.apache.org/jira/browse/FLINK-17753

Best,
Xingbo

anfeng  于2020年11月17日周二 下午5:31写道:

> 使用了你说的DDL的方式   可还是报错
>
> kafka_source_ddl = """CREATE TABLE mysource (createTime STRING,
>  type BIGINT,
>  uid STRING,
>  countryId BIGINT,
>  data STRING,
>  rowtime as
> TO_TIMESTAMP(createTime),
>  WATERMARK FOR rowtime AS
> rowtime - INTERVAL '2' SECOND
>  ) WITH (...)
>
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime

2020-11-17 Thread Xingbo Huang
Hi,

As far as I know, a TimeWindow does not have the attribute createTime? What
is the semantics of createTime you want

Best,
Xingbo

anfeng  于2020年11月17日周二 下午5:26写道:

> st_env.from_path("mysource") \
>
>
> .window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w"))
> \
>   .group_by("w") \
>   .select("w.createTime as a, w.start as b, w.end as c, uid.count
> as
> d") \
>   .insert_into("mysink")
>
>
> .select("w.createTime as a, w.start as b, w.end as c, uid.count as d")
> \
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py",
> line 784, in select
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
>
> "/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
> : org.apache.flink.table.api.ValidationException: Undefined function:
> createTime
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Joshua Fan
Hi Robert,

When the `delete(Path f, boolean recursive)` recursive is false, hdfs will
throw exception like below:
[image: checkpoint-exception.png]

Yours sincerely
Josh

On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger  wrote:

> Hey Josh,
>
> As far as I understand the code CompletedCheckpoint.discard(), Flink is
> removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then
> deleting the directory.
>
> Which files are left over in your case?
> Do you see any exceptions on the TaskManagers?
>
> Best,
> Robert
>
> On Wed, Nov 11, 2020 at 12:08 PM Joshua Fan 
> wrote:
>
>> Hi
>>
>> When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation
>> .disposeStorageLocation will be called.
>> Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
>> action. I wonder why the recursive parameter is set to false? as the
>> exclusiveCheckpointDir is truly a directory. in our hadoop, this causes
>> the checkpoint cannot be removed.
>> It is easy to change the recursive parameter to true, but is there any
>> potential harm?
>>
>> Yours sincerely
>> Josh
>>
>>


Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi,

There is a login operation in
YarnEntrypointUtils.logYarnEnvironmentInformation without the keytab.
One suspect is that Flink may access the HDFS when it tries to build
the PackagedProgram.

Does this issue only happen in the application mode? If so, I would cc
@kkloudas.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 4:52 PM Yangze Guo  wrote:
>
> Hi,
>
> AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the
> HadoopModule when user provides the keytab and principal. I'll try to
> do a deeper investigation to figure out is there any HDFS access
> before the HadoopModule installed.
>
> Best,
> Yangze Guo
>
>
> On Tue, Nov 17, 2020 at 4:36 PM Kien Truong  wrote:
> >
> > Hi,
> >
> > Yes, I did. There're also logs about logging in using keytab successfully 
> > in both Job Manager and Task Manager.
> >
> > I found some YARN docs about token renewal on AM restart
> >
> >
> > > Therefore, to survive AM restart after token expiry, your AM has to get 
> > > the NMs to localize the keytab or make no HDFS accesses until (somehow) a 
> > > new token has been passed to them from a client.
> >
> > Maybe Flink did access HDFS with an expired token, before switching to use 
> > the localized keytab ?
> >
> > Regards,
> > Kien
> >
> >
> >
> > On 17 Nov 2020 at 15:14, Yangze Guo  wrote:
> >
> > Hi, Kien,
> >
> >
> >
> > Do you config the "security.kerberos.login.principal" and the
> >
> > "security.kerberos.login.keytab" together? If you only set the keytab,
> >
> > it will not take effect.
> >
> >
> >
> > Best,
> >
> > Yangze Guo
> >
> >
> >
> > On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
> >
> > >
> >
> > > Hi all,
> >
> > >
> >
> > > We are having an issue where Flink Application Master is unable to 
> > > automatically restart Flink job after its delegation token has expired.
> >
> > >
> >
> > > We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster 
> > > mode. We have also add valid keytab configuration and taskmanagers are 
> > > able to login with keytabs correctly. However, it seems YARN Application 
> > > Master still use delegation tokens instead of the keytab.
> >
> > >
> >
> > > Any idea how to resolve this would be much appreciated.
> >
> > >
> >
> > > Thanks
> >
> > > Kien
> >
> > >
> >
> > >
> >
> > >
> >
> > >
> >


Re: pyflink1.11 window groupby出错

2020-11-17 Thread anfeng
使用了你说的DDL的方式   可还是报错

kafka_source_ddl = """CREATE TABLE mysource (createTime STRING,
 type BIGINT,
 uid STRING,
 countryId BIGINT,
 data STRING,
 rowtime as
TO_TIMESTAMP(createTime),
 WATERMARK FOR rowtime AS
rowtime - INTERVAL '2' SECOND
 ) WITH (...)


py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.



--
Sent from: http://apache-flink.147419.n8.nabble.com/


py4j.protocol.Py4JJavaError: An error occurred while calling o21.select. : org.apache.flink.table.api.ValidationException: Undefined function: createTime

2020-11-17 Thread anfeng
st_env.from_path("mysource") \
 
.window(Slide.over("10.minutes").every("1.minutes").on("createTime").alias("w"))
\
  .group_by("w") \
  .select("w.createTime as a, w.start as b, w.end as c, uid.count as
d") \
  .insert_into("mysink")


.select("w.createTime as a, w.start as b, w.end as c, uid.count as d") \
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/table/table.py",
line 784, in select
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/tmp/pyflink/ddbc1fae-54c4-4bd1-8d32-4df36151c419/83b8abce-eed8-4ab4-8ff6-8526c3e04c8fpy4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o21.select.
: org.apache.flink.table.api.ValidationException: Undefined function:
createTime





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: IllegalStateException Printing Plan

2020-11-17 Thread Dawid Wysakowicz
Hi Rex,

The executeInsert method as the name states executes the query.
Therefore after the method there is nothing in the topology and thus you
get the exception.

You can either explain the userDocsTable:

|userDocsTable.explain()|

or you can explain a statement set if you want to postpone the execution:

|StatementSet set = tEnv.createStatementSet();|

|set.addInsert(SINK_ES_PEOPLE, userDocsTable);|

|set.explain()|

or you can explain SQL:

|String sqlQuery = ...||
|

|tEnv.explainSql(sqlQuery);|

Best,

Dawid

On 17/11/2020 09:16, Khachatryan Roman wrote:
> Hello,
>
> Can you share the full program? 
> getExecutionPlan call is probably misplaced.
>
> Regards,
> Roman
>
>
> On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley  > wrote:
>
> Hello,
>
> I have the following code attempting to print the execution plan
> for my job locally. The job runs fine and Flink UI displays so I'd
> expect this to work.
>
> valtableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
> println(s"execution plan:\n${this.env.getExecutionPlan()}")
>
> but instead I end up with
>
> Caused by: java.lang.IllegalStateException: No operators defined
> in streaming topology. Cannot execute.
>
> What am I doing wrong?
>
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG
>  |  FOLLOW US
>  |  LIKE US
> 
>


signature.asc
Description: OpenPGP digital signature


Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-11-17 Thread Till Rohrmann
Glad to hear that!

Cheers,
Till

On Tue, Nov 17, 2020 at 5:35 AM Eleanore Jin  wrote:

> Hi Till,
>
> Thanks for the response! The metrics I got from cadvisor and visualized
> via dashboard shipped by kubernetes. I actually run the flink job for the
> past 2 weeks and the memory usage has been stabilized. There is no issue so
> far. I still could not figure out the mystery why it was trending up
> initially.
>
> Thanks a lot for the help!
> Eleanoree
>
> On Fri, Nov 13, 2020 at 7:01 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> sorry for my late reply. The heap dump you have sent does not look
>> problematic. How do you measure the pod memory usage exactly? If you start
>> the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120
>> MB of heap memory. Hence, this should be exactly what you are seeing in
>> your memory usage graph. This should actually happen independent of the
>> checkpointing.
>>
>> Maybe you can also share the debug logs with us. Maybe they contain some
>> more information.
>>
>> Cheers,
>> Till
>>
>> On Sat, Oct 24, 2020 at 12:24 AM Eleanore Jin 
>> wrote:
>>
>>> I also tried enable native memory tracking, via jcmd, here is the memory
>>> breakdown: https://ibb.co/ssrZB4F
>>>
>>> since job manager memory configuration for flink 1.10.2 only has
>>> jobmanager.heap.size, and it only translates to heap settings, should I
>>> also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job
>>> manager? And any recommendations?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin 
>>> wrote:
>>>
 Hi Till,

 please see the screenshot of heap dump: https://ibb.co/92Hzrpr

 Thanks!
 Eleanore

 On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin 
 wrote:

> Hi Till,
> Thanks a lot for the prompt response, please see below information.
>
> 1. how much memory assign to JM pod?
> 6g for container memory limit, 5g for jobmanager.heap.size, I think
> this is the only available jm memory configuration for flink 1.10.2
>
> 2. Have you tried with newer Flink versions?
> I am actually using Apache Beam, so the latest version they support
> for Flink is 1.10
>
> 3. What statebackend is used?
> FsStateBackend, and the checkpoint size is around 12MB from checkpoint
> metrics, so I think it is not get inlined
>
> 4. What is state.checkpoints.num-retained?
> I did not configure this explicitly, so by default only 1 should be
> retained
>
> 5. Anything suspicious from JM log?
> There is no Exception nor Error, the only thing I see is the below
> logs keeps on repeating
>
> {"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling
> threads for Delete operation as thread count 0 is <=
> 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":3}
>
> 6. JVM args obtained vis jcmd
>
> -Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20
> -XX:-OmitStackTraceInFastThrow
>
>
> 7. Heap info returned by jcmd  GC.heap_info
>
> it suggested only about 1G of the heap is used
>
> garbage-first heap   total 5242880K, used 1123073K
> [0x0006c000, 0x0008)
>
>   region size 2048K, 117 young (239616K), 15 survivors (30720K)
>
>  Metaspace   used 108072K, capacity 110544K, committed 110720K,
> reserved 1146880K
>
>   class spaceused 12963K, capacity 13875K, committed 13952K,
> reserved 1048576K
>
>
> 8. top -p 
>
> it suggested for flink job manager java process 4.8G of physical
> memory is consumed
>
> PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+
> COMMAND
>
>
> 1 root  20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62
> java
>
>
>
> Thanks a lot!
> Eleanore
>
>
> On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how much memory did you assign to the JM pod? Maybe the limit is so
>> high that it takes a bit of time until GC is triggered. Have you tried
>> whether the same problem also occurs with newer Flink versions?
>>
>> The difference between checkpoints enabled and disabled is that the
>> JM needs to do a bit more bookkeeping in order to track the completed
>> checkpoints. If you are using the HeapStateBackend, then all states 
>> smaller
>> than state.backend.fs.memory-threshold will get inlined, meaning that 
>> they
>> are sent to the JM and stored in the checkpoint meta file. This can
>> increase the memory usage of the JM process. Depending on
>> state.checkpoints.num-retained this can grow as large as number retained
>> checkpoints times the checkpoint size. 

Re:Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

2020-11-17 Thread kingdomad
大佬牛逼!
我看了一下,集群上确实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。
请问下这个问题要如何解决







--

kingdomad







在 2020-11-17 17:08:10,"hailongwang" <18868816...@163.com> 写道:
>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
>Shade 后的 kakfa 0.10的版本 的 artifactId 为:
>flink-sql-connector-kafka-0.10_${scala.binary.version}
>
>在 2020-11-17 15:47:08,"kingdomad"  写道:
>>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>>在idea调试没报错,提交到yarn集群就报错了。求助。
>>
>>
>>使用的consumer如下:
>>val logConsumer: FlinkKafkaConsumer010[String] = new 
>>FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>>
>>
>>pom文件导入的依赖如下:
>>
>>org.apache.flink
>>flink-connector-kafka-0.10_2.12
>>1.11.1
>>
>>
>>
>>
>>报错如下:
>>
>>2020-11-17 16:39:37
>>
>>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>>
>>at 
>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:718)
>>
>>at 
>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>>
>>at 
>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>>
>>at 
>>org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>>
>>at 
>>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>
>>at 
>>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>>
>>at 
>>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>
>>at 
>>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>
>>at 
>>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>>
>>at 
>>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>>
>>at 
>>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>>
>>at 
>>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>>
>>at 
>>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>>
>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>
>>at java.lang.Thread.run(Thread.java:745)
>>
>>Caused by: org.apache.kafka.common.KafkaException: 
>>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>>
>>at 
>>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>>
>>at 
>>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:637)
>>
>>... 15 more
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>kingdomad
>>


Re:ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

2020-11-17 Thread hailongwang
从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
Shade 后的 kakfa 0.10的版本 的 artifactId 为:
flink-sql-connector-kafka-0.10_${scala.binary.version}

在 2020-11-17 15:47:08,"kingdomad"  写道:
>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>在idea调试没报错,提交到yarn集群就报错了。求助。
>
>
>使用的consumer如下:
>val logConsumer: FlinkKafkaConsumer010[String] = new 
>FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)
>
>
>pom文件导入的依赖如下:
>
>org.apache.flink
>flink-connector-kafka-0.10_2.12
>1.11.1
>
>
>
>
>报错如下:
>
>2020-11-17 16:39:37
>
>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:718)
>
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>
>at 
>org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)
>
>at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>
>at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>
>at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>at java.lang.Thread.run(Thread.java:745)
>
>Caused by: org.apache.kafka.common.KafkaException: 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
>
>at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)
>
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:637)
>
>... 15 more
>
>
>
>
>
>
>
>--
>
>kingdomad
>


Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre,

You can try to replace the '@DataTypeHint("ROW")' with 
'@FunctionHint(output = new DataTypeHint("ROW”))'

Best,
Wei

> 在 2020年11月17日,15:45,Pierre Oberholzer  写道:
> 
> Hi Dian, Community,
> 
> (bringing the thread back to wider audience)
> 
> As you suggested, I've tried to use DataTypeHint with Row instead of Map but 
> also this simple case leads to a type mismatch between UDF and Table API.
> I've also tried other Map objects from Flink (table.data.MapData, 
> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java 
> (java.util.Map) in combination with DataTypeHint, without success.
> N.B. I'm using version 1.11.
> 
> Am I doing something wrong or am I facing limitations in the toolkit ?
> 
> Thanks in advance for your support !
> 
> Best regards,
> 
> Scala UDF
> 
> class dummyMap() extends ScalarFunction {
> 
>  @DataTypeHint("ROW")
>  def eval(): Row = {
> 
> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
> 
>   }
> }
> 
> Table DDL
> 
> my_sink_ddl = f"""
> create table mySink (
> output_of_my_scala_udf ROW
> ) with (
> ...
> )
> """
> 
> Error
> 
> Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.ValidationException: Field types of query result 
> and registered TableSink `default_catalog`.`default_database`.`mySink` do not 
> match.
> Query result schema: [output_of_my_scala_udf: 
> GenericType]
> TableSink schema:[output_of_my_scala_udf: Row(s: String, t: String)]
> 
> 
> 
> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer  > a écrit :
> Thanks Dian, but same error when using explicit returned type:
> 
> class dummyMap() extends ScalarFunction {
> 
>   def eval() : util.Map[java.lang.String,java.lang.String] = {
> 
> val states = Map("key1" -> "val1", "key2" -> "val2")
> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
> 
>   }
> }
> 
> Le ven. 13 nov. 2020 à 10:34, Dian Fu  > a écrit :
> You need to explicitly defined the result type the UDF. You could refer to 
> [1] for more details if you are using Flink 1.11. If you are using other 
> versions of Flink, you need to refer to the corresponding documentation.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>  
> 
>> 在 2020年11月13日,下午4:56,Pierre Oberholzer > > 写道:
>> 
>> ScalarFunction
> 
> 
> 
> -- 
> Pierre
> 
> -- 
> Pierre



pyflink 1.11 运行pyflink作业时报错

2020-11-17 Thread whh_960101
Hi,各位大佬,pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table 
insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in 
from_kafka_to_oracle_demo

main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", 
line 783, in execute_insert
return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Failed to execute sql'
org.apache.flink.client.program.ProgramAbortException
 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed
 to execute sql 是什么原因



 





 

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi,

AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the
HadoopModule when user provides the keytab and principal. I'll try to
do a deeper investigation to figure out is there any HDFS access
before the HadoopModule installed.

Best,
Yangze Guo


On Tue, Nov 17, 2020 at 4:36 PM Kien Truong  wrote:
>
> Hi,
>
> Yes, I did. There're also logs about logging in using keytab successfully in 
> both Job Manager and Task Manager.
>
> I found some YARN docs about token renewal on AM restart
>
>
> > Therefore, to survive AM restart after token expiry, your AM has to get the 
> > NMs to localize the keytab or make no HDFS accesses until (somehow) a new 
> > token has been passed to them from a client.
>
> Maybe Flink did access HDFS with an expired token, before switching to use 
> the localized keytab ?
>
> Regards,
> Kien
>
>
>
> On 17 Nov 2020 at 15:14, Yangze Guo  wrote:
>
> Hi, Kien,
>
>
>
> Do you config the "security.kerberos.login.principal" and the
>
> "security.kerberos.login.keytab" together? If you only set the keytab,
>
> it will not take effect.
>
>
>
> Best,
>
> Yangze Guo
>
>
>
> On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
>
> >
>
> > Hi all,
>
> >
>
> > We are having an issue where Flink Application Master is unable to 
> > automatically restart Flink job after its delegation token has expired.
>
> >
>
> > We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster 
> > mode. We have also add valid keytab configuration and taskmanagers are able 
> > to login with keytabs correctly. However, it seems YARN Application Master 
> > still use delegation tokens instead of the keytab.
>
> >
>
> > Any idea how to resolve this would be much appreciated.
>
> >
>
> > Thanks
>
> > Kien
>
> >
>
> >
>
> >
>
> >
>


Re: How to convert Int to Date

2020-11-17 Thread Khachatryan Roman
Hello,

Do both of the types you use have the same nullability?
For a primitive int, the documentation you referred to says: "Output only
if type is not nullable".

Regards,
Roman


On Tue, Nov 17, 2020 at 7:49 AM Rex Fenley  wrote:

> Hello,
>
> I'm using the Table API and I have a column which is an integer day since
> epoch. According to the docs [1] both `int` and `java.lang.Integer` are
> acceptable for DATE. However, if I try to use the SQL API to write a DATE
> out to the Elasticsearch connector for the INT column I receive an
> exception. How then should I go about converting to DATE?
>
> Exception:
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.sink_es_people do not match.
> Query schema: [... column: INT, ...]
> Sink schema: [... column: DATE, ...]
>
> I know this column is the culprit because when I make it INT on both ends
> it works.
>
> How do I go about making my INT a DATE?
>
> Thanks!
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#date-and-time
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

2020-11-17 Thread kingdomad
flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
在idea调试没报错,提交到yarn集群就报错了。求助。


使用的consumer如下:
val logConsumer: FlinkKafkaConsumer010[String] = new 
FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), properties)


pom文件导入的依赖如下:

org.apache.flink
flink-connector-kafka-0.10_2.12
1.11.1




报错如下:

2020-11-17 16:39:37

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:718)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)

at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010PartitionDiscoverer.initializeConnections(Kafka010PartitionDiscoverer.java:58)

at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.kafka.common.KafkaException: 
org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 is not an instance of org.apache.kafka.common.serialization.Deserializer

at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:205)

at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:637)

... 15 more







--

kingdomad



Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Kien Truong
Hi,

Yes, I did. There're also logs about logging in using keytab successfully
in both Job Manager and Task Manager.

I found some YARN docs about token renewal on AM restart


> Therefore, to survive AM restart after token expiry, your AM has to get
the NMs to localize the keytab or make no HDFS accesses until (somehow) a
new token has been passed to them from a client.

Maybe Flink did access HDFS with an expired token, before switching to use
the localized keytab ?

Regards,
Kien



On 17 Nov 2020 at 15:14, Yangze Guo  wrote:

Hi, Kien,

Do you config the "security.kerberos.login.principal" and the
"security.kerberos.login.keytab" together? If you only set the keytab,
it will not take effect.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
>
> Hi all,
>
> We are having an issue where Flink Application Master is unable to 
> automatically restart Flink job after its delegation token has expired.
>
> We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster mode. 
> We have also add valid keytab configuration and taskmanagers are able to 
> login with keytabs correctly. However, it seems YARN Application Master still 
> use delegation tokens instead of the keytab.
>
> Any idea how to resolve this would be much appreciated.
>
> Thanks
> Kien
>
>
>
>


Re: IllegalStateException Printing Plan

2020-11-17 Thread Khachatryan Roman
Hello,

Can you share the full program?
getExecutionPlan call is probably misplaced.

Regards,
Roman


On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley  wrote:

> Hello,
>
> I have the following code attempting to print the execution plan for my
> job locally. The job runs fine and Flink UI displays so I'd expect this to
> work.
>
> val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE)
> println(s"execution plan:\n${this.env.getExecutionPlan()}")
>
> but instead I end up with
>
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot execute.
>
> What am I doing wrong?
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Aljoscha Krettek

Hi,

to ensure that we really are using per-job mode, could you try and use

$ flink run -t yarn-per-job -d <...>

This will directly specify that we want to use the YARN per-job 
executor, which bypasses some of the logic in the older YARN code paths 
that differentiate between YARN session mode and YARN per-job mode.


Best,
Aljoscha

On 17.11.20 07:02, Tzu-Li (Gordon) Tai wrote:

Hi,

Not sure if this question would be more suitable for the Apache Beam
mailing lists, but I'm pulling in Aljoscha (CC'ed) who would know more
about Beam and whether or not this is an expected behaviour.

Cheers,
Gordon

On Mon, Nov 16, 2020 at 10:35 PM Dongwon Kim  wrote:


Hi,

I'm trying to run a per-job cluster for a Beam pipeline w/ FlinkRunner on
YARN as follows:


flink run -m yarn-cluster -d \


 my-beam-pipeline.jar \

 --runner=FlinkRunner \
 --flinkMaster=[auto] \
 --parallelism=8



Instead of creating a per-job cluster as wished, the above command seems
to create a session cluster and then submit a job onto the cluster.

I doubt it because
(1) In the attached file, there's "Submit New Job" which is not shown in
other per-job applications that are written in Flink APIs and submitted to
YARN similar to the above command.

[image: beam on yarn.png]
(2) When the job is finished, the YARN application is still in its RUNNING
state without being terminated. I had to kill the YARN application manually.

FYI, I'm using
- Beam v2.24.0 (Flink 1.10)
- Hadoop v3.1.1

Thanks in advance,

Best,

Dongwon







Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi, Kien,

Do you config the "security.kerberos.login.principal" and the
"security.kerberos.login.keytab" together? If you only set the keytab,
it will not take effect.

Best,
Yangze Guo

On Tue, Nov 17, 2020 at 3:03 PM Kien Truong  wrote:
>
> Hi all,
>
> We are having an issue where Flink Application Master is unable to 
> automatically restart Flink job after its delegation token has expired.
>
> We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster mode. 
> We have also add valid keytab configuration and taskmanagers are able to 
> login with keytabs correctly. However, it seems YARN Application Master still 
> use delegation tokens instead of the keytab.
>
> Any idea how to resolve this would be much appreciated.
>
> Thanks
> Kien
>
>
>
>