does mapFunction need to implement CheckpointedFunction?

2019-07-04 Thread liu ze
Hi, I want to update third-party system in the mapFunction ,does mapFunction need to implement CheckpointedFunction? For example, in the mapFunction I want to update mysql, do I need to implement checkpointfunc, manage the state myself stream=env.addSource() stream.map( "insert update mysq

Re:Tracking message processing in my application

2019-07-04 Thread Haibo Sun
Hi, Roey > What do you think about that? I would have some concerns about throughput and latency, so I think that the operators should report state data asynchronously and in batches to minimize the impact of monitoring on the normal business processing. In addition, If the amount of busin

Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Hao Sun
Personally I prefer 3) to keep split/select and correct the behavior. I feel side output is kind of overkill for such a primitive function, and I prefer simple APIs like split/select. Hao Sun On Thu, Jul 4, 2019 at 11:20 AM Xingcan Cui wrote: > Hi folks, > > Two weeks ago, I started a thread [

[VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Xingcan Cui
Hi folks, Two weeks ago, I started a thread [1] discussing whether we should discard the split/select methods (which have been marked as deprecation since v1.7) in DataStream API. The fact is, these methods will cause "unexpected" results when using consecutively (e.g., ds.split(a).select(b).spli

Re: Apache Flink - How to find the number of window instances in an application

2019-07-04 Thread Chesnay Schepler
This is unfortunately not possible. On 04/07/2019 19:40, M Singh wrote: Hi: I wanted to find out if there is a metric to find out the the number of global or non-global window instances in a Flink application. Thanks Mans

Apache Flink - How to find the number of window instances in an application

2019-07-04 Thread M Singh
Hi: I wanted to find out if there is a metric to find out the the number of global or non-global window instances in a Flink application. Thanks Mans

Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-04 Thread Bowen Li
Thanks Youssef. The context makes more sense to me now. Just from your description, I doubt it might be because of upsert - the sink's throughput in step 1 is high but may stuck in step 2. AFAIK, Hive ACID/UPSERT is not really scalable, it's ok for rare, occasional usage but cannot scale well to m

RE: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

2019-07-04 Thread Hailu, Andreas
Very well - thank you both. // ah From: Haibo Sun Sent: Wednesday, July 3, 2019 9:37 PM To: Hailu, Andreas [Tech] Cc: Yitzchak Lieberman ; user@flink.apache.org Subject: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat Hi, Andreas I'm glad you have had a solution. If you're intereste

Re: Source Kafka and Sink Hive managed tables via Flink Job

2019-07-04 Thread Youssef Achbany
Thank you Li for your answer and sorry for the dev mistake :). *To be more clear:* We write multiple events, assigned via a Flink tumbling window, to Hive in one JDBC INSERT statement. We wrote a Hive sink function for that, using only JDBC. We do not use partitions yet, but the table is clustere

Re: UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2019-07-04 Thread Chesnay Schepler
Flink only supports Java 8. On 04/07/2019 15:34, Rauch, Jochen wrote: Hi all, I have implemented following code snippet with Apache Flink 1.8: flinkConfiguration.getEnvironment().readTextFile(outputFile.getAbsolutePath(), "ISO-8859-1") .flatMap(new FlatMapFunctionObject>, Integer>>() {  

UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2019-07-04 Thread Rauch, Jochen
Hi all, I have implemented following code snippet with Apache Flink 1.8: flinkConfiguration.getEnvironment().readTextFile(outputFile.getAbsolutePath(), "ISO-8859-1") .flatMap(new FlatMapFunction, Integer>>() { }) It works fine with Java 8, but using Java 11 I get this

Tracking message processing in my application

2019-07-04 Thread Halfon, Roey
Hi, We are looking for a monitoring solution for our dataflow - Track the progress of incoming messages while they are processed. I'll clarify - we want to build some service which will show status for each incoming message. And in case of failures to give some detailed information. I thought ab

Re: Can Flink infers the table columns type

2019-07-04 Thread Dawid Wysakowicz
Hi, Unfortunately the automatic schema inference of jdbc source is not supported yet. There is also no jdbc TableSource yet, but you should be able to write one yourself that reuses the JDBCInputFormat. You may take a look at BatchTableSource/StreamTableSource interfaces and corresponding methods

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
*@Fabian do you register any types / serializers via ExecutionConfig.registerKryoType(...) / ExecutionConfig.registerTypeWithKryoSerializer(...)?* Nope, not at all. our flink job code has nowhere the word "Kryo" at all. thx for looking into it ... -- *Fabian WollertZalando SE* E-Mail: fab...@

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Tzu-Li (Gordon) Tai
I quickly checked the implementation of duplicate() for both the KryoSerializer and StreamElementSerializer (which are the only serializers involved here). They seem to be correct; especially for the KryoSerializer, since FLINK-8836 [1] we now always perform a deep copy of the KryoSerializer when d

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Fabian Wollert
No, not yet. We lack some knowledge in understanding this. The only thing we found out that it happens most probably in the Elasticsearch Sink, because: - some error messages have the sink in their stack trace. - when bumping the ES nodes specs on AWS, the error happens less often (we haven't bumpe

Re: Debug Kryo.Serialization Exception

2019-07-04 Thread Flavio Pompermaier
Any news on this? Have you found the cause of the error? On Fri, Jun 28, 2019 at 10:10 AM Flavio Pompermaier wrote: > Indeed looking at StreamElementSerializer the duplicate() method could be > bugged: > > @Override > public StreamElementSerializer duplicate() { > TypeSerializer copy = typ

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Tzu-Li (Gordon) Tai
Hi Andrea, Is there a specific reason you want to use a custom TypeInformation / TypeSerializer for your type? >From the description in the original post, this part wasn't clear to me. If the only reason is because it is generally suggested to avoid generic type serialization via Kryo, both for p

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread Andrea Spina
Hi JingsongLee, thank you for your answer. I wanted to explore it as the last chance honestly. Anyway if defining custom serializers and types information involves quite a big effort, I would reconsider my guess. Cheers, Il giorno gio 4 lug 2019 alle ore 08:46 JingsongLee ha scritto: > Hi Andre

Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

2019-07-04 Thread Anyang Hu
Thanks for your replies. To Peter: The heartbeat.timeout has been increased to 3 minutes before, but the job manager timeout will still occur. At present, the following logic is added : When JM times out, onFatalError is called, which can ensure that the job fails to exit quickly. Does the method