Re: left join flink stream

2020-11-17 Thread tkg_cangkul
Hi Guowei Ma, Thanks for your reply, In my case. I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table. for example : in my kafka topic i've this fields : id, name, position, experience in my reference mysql table i've this fields: id, name

Re: Re: Flink 1.11 not showing logs

2020-11-17 Thread Yang Wang
Hi Arnaud, It seems that the TaskExecutor terminated exceptionally. I think you need to check the logs of container_e38_1604477334666_0960_01_04 to figure out why it crashed or shut down. Best, Yang LINZ, Arnaud 于2020年11月16日周一 下午7:11写道: > Hello, > > I'm running Flink 1.10 on a yarn cluster

Re: keystore location on EMR

2020-11-17 Thread Fanbin Bu
trying to put the jks on s3... unfortunately, no luck. i have properties set up: 'properties.ssl.keystore.location'='s3://application-bucket/kafka.keystore.jks' got the following error message: at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore .load(SslEngineBuilder.java:

Re: Logs of JobExecutionListener

2020-11-17 Thread Flavio Pompermaier
is this a bug or is it a documentation problem...? Il sab 14 nov 2020, 18:44 Flavio Pompermaier ha scritto: > I've also verified that the problem persist also using a modified version > of the WordCount class. > If you add the code pasted at the end of this email at the end of its main > method

Re: Upsert UDFs

2020-11-17 Thread Rex Fenley
Hi, Does this seem like it would help? Thanks! On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley wrote: > Thanks! We did give that a shot and ran into the bug that I reported here > https://issues.apache.org/jira/browse/FLINK-20036 . > > I'm also seeing this function > > public void emitUpdateWith

execution.runtime-mode=BATCH when reading from Hive

2020-11-17 Thread Dongwon Kim
Hi, Recently I've been working on a real-time data stream processing pipeline with DataStream API while preparing for a new service to launch. Now it's time to develop a back-fill job to produce the same result by reading data stored on Hive which we use for long-term storage. Meanwhile, I watche

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, Those 2 approaches all work in my local machine, this is my code: Scala UDF: package com.dummy import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.api.Types import org.apache.flink.table.functi

Re: "stepless" sliding windows?

2020-11-17 Thread Alex Cruise
On Thu, Oct 22, 2020 at 11:08 AM Jacob Sevart wrote: > I think the issue is you have to specify a *time *interval for "step." It > would be nice to consider the preceding N minutes as of every message. You > can somewhat approximate that using a very small step. > Indeed, I want the window to sl

keystore location on EMR

2020-11-17 Thread Fanbin Bu
Hi, I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried to put keystore.jks location under /usr/lib/flink/... like: export SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks Notice that this is on EMR master(master) node. Bo

Lateral join not finding correlate variable

2020-11-17 Thread Dylan Forciea
This may be due to not understanding lateral joins in Flink – perhaps you can only do so on temporal variables – but I figured I’d ask since the error message isn’t intuitive. I am trying to do a combination of a lateral join and a top N query. Part of my ordering is based upon whether the a v

Job Manager is taking very long time to finalize the Checkpointing.

2020-11-17 Thread Slim Bouguerra
Originally posed to the dev list -- Forwarded message - From: Slim Bouguerra Date: Tue, Nov 17, 2020 at 8:09 AM Subject: Job Manager is taking very long time to finalize the Checkpointing. To: Hi Devs, I am very new to the Flink code base and working on the evaluation of the Ch

Re: IllegalStateException Printing Plan

2020-11-17 Thread Rex Fenley
So I tried userDocsTable.explain() however it doesn't give me the AST as JSON so that I can use the visualizer tool https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html . Also, if I get rid of executeInsert or move it to after getExecutionPlan I still end up with "Caused

Re: IllegalStateException Printing Plan

2020-11-17 Thread Rex Fenley
I don't think I can share the full program. However, the program is a long series of joines and aggs against various sources and that is the only sink. Thanks! On Tue, Nov 17, 2020 at 12:17 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hello, > > Can you share the full program? >

Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Fanbin Bu
all those are verified. the issue is fixed by adding org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory to org.apache.flink.table.factories.Factory. Thanks, Fanbin On Tue, Nov 17, 2020 at 7:29 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi, > > Please

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei, True, I'm using the method you mention, but glad to change. I tried your suggestion instead, but got a similar error. Thanks for your support. That is much more tedious than I thought. *Option 1 - SQL UDF* *SQL UDF* create_func_ddl = """ CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap

Re: Force Join Unique Key

2020-11-17 Thread Rex Fenley
Ok, what are the performance consequences then of having a join with NoUniqueKey if the left side's key actually is unique in practice? Thanks! On Tue, Nov 17, 2020 at 7:35 AM Jark Wu wrote: > Hi Rex, > > Currently, the unique key is inferred by the optimizer. However, the > inference is not p

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-17 Thread Kye Bae
It is possible, but I am not entirely sure about the load order affecting the metaspace usage. To find out why your taskmanager container is exceeding the metaspace, we would need to know what value the max metaspace size is set to and then find out how much of the metaspace is actually being used

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-17 Thread Flavio Pompermaier
Another big potential candidate is the fact that JDBC libs I use in my job are put into the Flink lib folder instead of putting them into the fat jar..tomorrow I'll try to see if the metaspace is getting cleared correctly after that change. Unfortunately our jobs were written before the child-first

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-17 Thread Simone Cavallarin
Hi, I have been working on the suggestion that you gave me, thanks! The first part is to add to the message the gap. 1)I receive the event, 2)I take that event and I map it using StatefulsessionCalculator, that is where I put together "The message", and "long" that is my gap in millis. DataSt

Re: Kafka SQL table Re-partition via Flink SQL

2020-11-17 Thread Jark Wu
Hi Slim, In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and set the class name to 'sink.partitioner' option. In 1.12, you can re-partition the data by specifying the key field (Kafka producer will partition data by the message key by default). You can do this by adding some

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Dongwon Kim
Hi Aljoscha, Thanks for the input. The '-t' option seems to be available as of flink-1.11 while the latest FlinkRunner is based on flink-1.10. So I use '-e' option which is available in 1.10: $ flink run -e yarn-per-job -d <...> A short question here is that this command ignores *-yD* and *--ya

Re: Force Join Unique Key

2020-11-17 Thread Jark Wu
Hi Rex, Currently, the unique key is inferred by the optimizer. However, the inference is not perfect. There are known issues that the unique key is not derived correctly, e.g. FLINK-20036 (is this opened by you?). If you think you have the same case, please open an issue. Query hint is a nice wa

Re: Flink 1.11.2 could not create kafka table source on EMR.

2020-11-17 Thread Khachatryan Roman
Hi, Please verify that: 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory") 2. kafka-connector version matches the version of Flink distribution on EMR. Regards, Roman On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu wrote: > Hi, > > I cou

Re: How to convert Int to Date

2020-11-17 Thread Timo Walther
Hi Rex, the classes mentioned in the documentation such as `int` and `java.lang.Integer` are only used when you leave the SQL world to a UDF or to a Java implementation in a sink. But as a SQL user you only need to pay attention to the logical data type. Those must match entirely or be a sup

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, I guess your UDF is registered by the method 'register_java_function' which uses the old type system. In this situation you need to override the 'getResultType' method instead of adding type hint. You can also try to register your UDF via the "CREATE FUNCTION" sql statement, which

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Pierre Oberholzer
Hi Wei, Thanks for your suggestion. Same error. *Scala UDF* @FunctionHint(output = new DataTypeHint("ROW")) class dummyMap() extends ScalarFunction { def eval(): Row = { Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) } } Best regards, Le mar. 17 nov. 2020 à 10

Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Khachatryan Roman
Hi, I think Robert is right, state handles are deleted first, and then the directory is deleted non-recursively. If any exception occurs while removing the files, it will be combined with the other exception (as suppressed). So probably Flink failed to delete some files and then directory removal

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Kien Truong
Hi Yangze, Thanks for checking. I'm not using the new application mode, but the old single job yarn-cluster mode. I'll try to get some more logs tomorrow. Regards, Kien On 17 Nov 2020 at 16:37, Yangze Guo wrote: Hi, There is a login operation in YarnEntrypointUtils.logYarnEnvironmentInforma

Re: why not flink delete the checkpoint directory recursively?

2020-11-17 Thread Joshua Fan
Hi Robert, When the `delete(Path f, boolean recursive)` recursive is false, hdfs will throw exception like below: [image: checkpoint-exception.png] Yours sincerely Josh On Thu, Nov 12, 2020 at 4:29 PM Robert Metzger wrote: > Hey Josh, > > As far as I understand the code CompletedCheckpoint.dis

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi, There is a login operation in YarnEntrypointUtils.logYarnEnvironmentInformation without the keytab. One suspect is that Flink may access the HDFS when it tries to build the PackagedProgram. Does this issue only happen in the application mode? If so, I would cc @kkloudas. Best, Yangze Guo On

Re: IllegalStateException Printing Plan

2020-11-17 Thread Dawid Wysakowicz
Hi Rex, The executeInsert method as the name states executes the query. Therefore after the method there is nothing in the topology and thus you get the exception. You can either explain the userDocsTable: |userDocsTable.explain()| or you can explain a statement set if you want to postpone the

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-11-17 Thread Till Rohrmann
Glad to hear that! Cheers, Till On Tue, Nov 17, 2020 at 5:35 AM Eleanore Jin wrote: > Hi Till, > > Thanks for the response! The metrics I got from cadvisor and visualized > via dashboard shipped by kubernetes. I actually run the flink job for the > past 2 weeks and the memory usage has been sta

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-17 Thread Wei Zhong
Hi Pierre, You can try to replace the '@DataTypeHint("ROW")' with '@FunctionHint(output = new DataTypeHint("ROW”))' Best, Wei > 在 2020年11月17日,15:45,Pierre Oberholzer 写道: > > Hi Dian, Community, > > (bringing the thread back to wider audience) > > As you suggested, I've tried to use DataType

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi, AFAIK, Flink does exclude the HDFS_DELEGATION_TOKEN in the HadoopModule when user provides the keytab and principal. I'll try to do a deeper investigation to figure out is there any HDFS access before the HadoopModule installed. Best, Yangze Guo On Tue, Nov 17, 2020 at 4:36 PM Kien Truong

Re: How to convert Int to Date

2020-11-17 Thread Khachatryan Roman
Hello, Do both of the types you use have the same nullability? For a primitive int, the documentation you referred to says: "Output only if type is not nullable". Regards, Roman On Tue, Nov 17, 2020 at 7:49 AM Rex Fenley wrote: > Hello, > > I'm using the Table API and I have a column which is

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Kien Truong
Hi, Yes, I did. There're also logs about logging in using keytab successfully in both Job Manager and Task Manager. I found some YARN docs about token renewal on AM restart > Therefore, to survive AM restart after token expiry, your AM has to get the NMs to localize the keytab or make no HDFS a

Re: IllegalStateException Printing Plan

2020-11-17 Thread Khachatryan Roman
Hello, Can you share the full program? getExecutionPlan call is probably misplaced. Regards, Roman On Tue, Nov 17, 2020 at 8:26 AM Rex Fenley wrote: > Hello, > > I have the following code attempting to print the execution plan for my > job locally. The job runs fine and Flink UI displays so I

Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Aljoscha Krettek
Hi, to ensure that we really are using per-job mode, could you try and use $ flink run -t yarn-per-job -d <...> This will directly specify that we want to use the YARN per-job executor, which bypasses some of the logic in the older YARN code paths that differentiate between YARN session mode

Re: Flink on YARN: delegation token expired prevent job restart

2020-11-17 Thread Yangze Guo
Hi, Kien, Do you config the "security.kerberos.login.principal" and the "security.kerberos.login.keytab" together? If you only set the keytab, it will not take effect. Best, Yangze Guo On Tue, Nov 17, 2020 at 3:03 PM Kien Truong wrote: > > Hi all, > > We are having an issue where Flink Applicat