Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-16 Thread Pierre Oberholzer
Hi Dian, Community, (bringing the thread back to wider audience) As you suggested, I've tried to use DataTypeHint with Row instead of Map but also this simple case leads to a type mismatch between UDF and Table API. I've also tried other Map objects from Flink (table.data.MapData, flink.types.Map

IllegalStateException Printing Plan

2020-11-16 Thread Rex Fenley
Hello, I have the following code attempting to print the execution plan for my job locally. The job runs fine and Flink UI displays so I'd expect this to work. val tableResult = userDocsTable.executeInsert(SINK_ES_PEOPLE) println(s"execution plan:\n${this.env.getExecutionPlan()}") but instead I

Force Join Unique Key

2020-11-16 Thread Rex Fenley
Hello, I have quite a few joins in my plan that have leftInputSpec=[NoUniqueKey] in Flink UI. I know this can't truly be the case that there is no unique key, at least for some of these joins that I've evaluated. Is there a way to hint to the join what the unique key is for a table? Thanks! -

Flink on YARN: delegation token expired prevent job restart

2020-11-16 Thread Kien Truong
Hi all, We are having an issue where Flink Application Master is unable to automatically restart Flink job after its delegation token has expired. We are using Flink 1.11 with YARN 3.1.1 in single job per yarn-cluster mode. We have also add valid keytab configuration and taskmanagers are able to

How to convert Int to Date

2020-11-16 Thread Rex Fenley
Hello, I'm using the Table API and I have a column which is an integer day since epoch. According to the docs [1] both `int` and `java.lang.Integer` are acceptable for DATE. However, if I try to use the SQL API to write a DATE out to the Elasticsearch connector for the INT column I receive an exce

Re: Flink State Processor API - Bootstrap One state

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi, Using the State Processor API, modifying the state in an existing savepoint results in a new savepoint (new directory) with the new modified state. The original savepoint remains intact. The API allows you to only touch certain operators, without having to touch any other state and have them r

Flink State Processor API - Bootstrap One state

2020-11-16 Thread ApoorvK
Currently my flink application has state size of 160GB(around 50 operators), where few state operator size is much higher, I am planning to use state processor API to bootstrap let say one particular state having operator id o1 and inside is a ValueState s1 as ID. Following steps I have planned to

Re: split avro kafka field

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi, 1. You'd have to configure your Kafka connector source to use a DeserializationSchema that deserializes the Kafka record byte to your generated Avro type. You can use the shipped `AvroDeserializationSchema` for that. 2. After your Kafka connector source, you can use a flatMap transformation t

Re: Kafka SQL table Re-partition via Flink SQL

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi, I'm pulling in some Flink SQL experts (in CC) to help you with this one :) Cheers, Gordon On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra wrote: > Hi, > I am trying to author a SQL job that does repartitioning a Kafka SQL table > into another Kafka SQL table. > as example input/output table

Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi, Both the data and metadata is being stored in the savepoint directory, since Flink 1.3. The metadata in the savepoint directory does not reference and checkpoint data files. In 1.11, what was changed was that the savepoint metadata uses relative paths to point to the data files in the savepoi

Flink 1.11.2 could not create kafka table source on EMR.

2020-11-16 Thread Fanbin Bu
Hi, I could not launch my flink 1.11.2 application on EMR with exception Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. I attached the full

Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Congxian Qiu
Hi Bajaj Savepoint does contain the metadata and data in Flink 1.10, it does not need to reference any checkpoint data. Best, Congxian Bajaj, Abhinav 于2020年11月17日周二 上午8:58写道: > Hi, > > > > I am trying to understand the Flink 1.10 savepoints related documentation >

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-11-16 Thread Eleanore Jin
Hi Till, Thanks for the response! The metrics I got from cadvisor and visualized via dashboard shipped by kubernetes. I actually run the flink job for the past 2 weeks and the memory usage has been stabilized. There is no issue so far. I still could not figure out the mystery why it was trending u

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-16 Thread Hector He
May I have a ask about deprecating readFileStream(...), is there a alternative to this method? Source code lead me to use readFile instead, but it does not perform as readFileStream, readFileStream can reads file content incrementally, but readFile with FileProcessingMode.PROCESS_CONTINUOUSLY argum

Re: Flink AutoScaling EMR

2020-11-16 Thread Rex Fenley
Thanks for all the input! On Sun, Nov 15, 2020 at 6:59 PM Xintong Song wrote: > Is there a way to make the new yarn job only on the new hardware? > > I think you can simply decommission the nodes from Yarn, so that new > containers will not be allocated from those nodes. You might also need a >

Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Bajaj, Abhinav
Hi, I am trying to understand the Flink 1.10 savepoints related documentation that mentions - “When triggering a savepoint, a new savepoint directory is created where the data as well a

Re: left join flink stream

2020-11-16 Thread Guowei Ma
Hi, Youzha In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways: 1. Using Table/Sql SDK. You could find a sql example(tempora

Re: Random Task executor shutdown

2020-11-16 Thread Guowei Ma
Hi, Arnaud Would you like to share the log of the shutdown task executor? BTW could you check the gc log of the task executor? Best, Guowei On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud wrote: > (reposted with proper subject line -- sorry for the copy/paste) > -Original message- > Hello,

Kafka SQL table Re-partition via Flink SQL

2020-11-16 Thread Slim Bouguerra
Hi, I am trying to author a SQL job that does repartitioning a Kafka SQL table into another Kafka SQL table. as example input/output tables have exactly the same SQL schema (see below) and data the only difference is that the new kafka stream need to be repartition using a simple project like item_

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Thank you Kye for your insights...in my mind, if the job runs without problems one or more times the heap size, and thus the medatadata-size, is big enough and I should not increase it (on the same data of course). So I'll try to understand who is leaking what..the advice to avoid the dynamic class

Re: [External Sender] Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Kye Bae
Hello! The JVM metaspace is where all the classes (not class instances or objects) get loaded. jmap -histo is going to show you the heap space usage info not the metaspace. You could inspect what is happening in the metaspace by using jcmd (e.g., jcmd JPID VM.native_memory summary) after restarti

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
The exclusions should not have any impact on that, because what defines which classloader will load which class is not the presence or particular class in a specific jar, but the configuration of parent-first-patterns [1]. If you don't use any flink internal imports, than it still might be the

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
I've tried to remove all possible imports of classes not contained in the fat jar but I still face the same problem. I've also tried to reduce as much as possible the exclude in the shade section of the maven plugin (I took the one at [1]) so now I exclude only few dependencies..could it be that I

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-16 Thread Aljoscha Krettek
Hi, thanks for the pointer, I should have remembered that thread earlier! I'll try and sketch what the pipeline might look like to show what I mean by "enriching the message" and where the operations would sit. DataStream source = DataStream> enriched = source .keyBy() .map(new Stateful

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread 김동원
Hi Ingo, Thank you for letting me know! I didn’t know that’s already discussed. Best, Dongwon > 2020. 11. 17. 오전 1:12, Ingo Bürk 작성: > >  > Hi, > > I ran into the same issue today. This is fixed in 1.11.3, the corresponding > bug was FLINK-19281. > > A workaround is to switch the curren

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread Dongwon Kim
Hi Danny~ Sorry for late reply, Let's take a look at a running example: > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .inBatchMode() > .build(); > > TableEnvironment tEnv = TableEnvironment.create(settings); > > HiveCatalog hiveCatalog = new HiveCatalog("hive",null, arg

left join flink stream

2020-11-16 Thread Youzha
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still conf

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
Yes, that could definitely cause this. You should probably avoid using these flink-internal shaded classes and ship your own versions (not shaded). Best,  Jan On 11/16/20 3:22 PM, Flavio Pompermaier wrote: Thank you Jan for your valuable feedback. Could it be that I should not use import shad

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Thank you Jan for your valuable feedback. Could it be that I should not use import shaded-jackson classes in my user code? For example import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper? Bets, Flavio On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský wrote: > Hi Flavi

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
Hi Flavio, when I encountered quite similar problem that you describe, it was related to a static storage located in class that was loaded "parent-first". In my case it was it was in java.lang.ClassValue, but it might (and probably will be) different in your case. The problem is that if user-

Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Flavio Pompermaier
Hello everybody, I was writing this email when a similar thread on this mailing list appeared.. The difference is that the other problem seems to be related with Flink 1.10 on YARN and does not output anything helpful in debugging the cause of the problem. Indeed, in my use case I use Flink 1.11.0

Re: PyFlink Table API and UDF Limitations

2020-11-16 Thread Dian Fu
Hi Niklas, > How can I ingest data in a batch table from Kafka or even better > Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch > isn't offering a source at all. > The only workaround which comes to my mind is to use the Kafka streaming > source and to apply a single

Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

2020-11-16 Thread Tim Josefsson
To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead of EXACTLY_ONCE makes the problem go away so I imagine there is something wrong with my setup. I'm using Kafka 2.2 and I have the following things set on the cluster: transaction.max.timeout.ms=360 transaction.state.log.

Random Task executor shutdown

2020-11-16 Thread LINZ, Arnaud
(reposted with proper subject line -- sorry for the copy/paste) -Original message- Hello, I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log: (...) 2020-11-15

split avro kafka field

2020-11-16 Thread Youzha
hi, i’am a new comer to learn flink stream. and i wanna try to split the kafka avro field into multiple fields. for example i have this one in my kafka topic : { id : 12345, name : “john”, location : “indonesia;jakarta;south jakarta” } and then i wanna split the location value by “;” in

RE: Re: Flink 1.11 not showing logs

2020-11-16 Thread LINZ, Arnaud
Hello, I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log: (...) 2020-11-15 16:18:42,202 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Receiv

Re: Job crash in job cluster mode

2020-11-16 Thread Tim Eckhardt
Hi Robert, hi Matthias, the job is doing some stateful stream processing (reading data from Kafka) and it should run endlessly, so ideally no restarts from time to time. The TaskManager is the one who is crashing in the end with this kind of exception:     org.apache.kafka.common.errors.Di

Re:Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang, After I copied the logic from `YarnLogConfigUtil` to my own deployer (maybe call its logic instead of copying is a better option), the logs now can show normally. Thanks again for the kind help. At 2020-11-16 17:28:47, "马阳阳" wrote: Hi Yang, I checked the `YarnLogConfigUtil`, i

Re:Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang, I checked the `YarnLogConfigUtil`, it does some work to set the configuration for log. Should I copy the logic to my deployer? At 2020-11-16 17:21:07, "马阳阳" wrote: Hi Yang, Thank you for you reply. I set the value for "$internal.deployment.config-dir" to the Flink configura

Re:Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread 马阳阳
Hi Yang, Thank you for you reply. I set the value for "$internal.deployment.config-dir" to the Flink configuration directory. And the configuration showed on Flink web UI. But it still not work. So I wonder what should I set as the value for "$internal.deployment.config-dir"? At 2020-11

Re: Re: Flink 1.11 not showing logs

2020-11-16 Thread Yang Wang
If you are using your own deployer(aka a java program calls the Flink client API to submit Flink jobs), you need to check the jobmanager configuration in webUI whether " $internal.yarn.log-config-file" is correctly set. If not, maybe you need to set " $internal.deployment.config-dir" in your deploy