Re: Storing offsets in Kafka
Hi jiangjie, Yeah I am using the second case. (Flink 1.7.1, Kafka 0.10.2, FlinkKafkaConsumer010) But now there is a problem, the data is consumed normally, but the commit offset is not continued. The following exception is found: [image: image.png] Becket Qin 于2019年9月5日周四 上午11:32写道: > Hi Dominik, > > There has not been any change to the offset committing logic in > KafkaConsumer for a while. But the logic is a little complicated. The > offset commit to Kafka is only enabled in the following two cases: > > 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true > (default value is true) > 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a) > enable.auto.commit=true (default value is true); b) > auto.commit.interval.ms>0 > (default value is 5000). > > Note that in case 1, if the job exits before the first checkpoint takes > place, then there will be no offset committed. > > Can you check if your setting falls in one of the two cases? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński wrote: > > > Hey, > > I was wondering whether something has changed for KafkaConsumer, since I > am > > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there > > seems to be no change in the topic where Kafka stores it's offsets, after > > restart Flink uses the `auto.offset.reset` so it seems that there is no > > offsets commit happening. The checkpoints are properly configured and I > am > > able to restore with Savepoint. But the group offsets are not working > > properly. It there anything that has changed in this manner ? > > > > Best Regards, > > Dom. > > >
Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment
Hi, Shuyi: What is the progress of the discussion? We also look forward to this feature. Thanks. Shuyi Chen 于2018年6月8日周五 下午3:04写道: > Thanks a lot for the comments, Till and Fabian. > > The RemoteEnvrionment does provide a way to specify jar files at > construction, but we want the jar files to be specified dynamically in the > user code, e.g. in a DDL statement, and the jar files might be in a remote > DFS. As we discussed, I think there are 2 approaches: > > 1) add new interface env.registerJarFile(jarFiles...), which ships the JAR > files using JobGraph.addJar(). In this case, all jars will be loaded by > default at runtime. This approach will be the same as how SQL client ship > UDF jars now. > 2) add new interface env.registerJarFile(name, jarFiles...). It will do > similar things as env.registerCachedFile(), which will register a set of > Jar files with a key name, and we can add a new interface in > RuntimeContext as Fabian suggests, i.e., > RuntimeContext.getClassloaderWithJar(). Now user will be able to > load the functions in remote jar dynamically using the returned > ClassLoader. > > Comparing the 2 approaches: > >- Approach 1) will be simpler for user to use. >- Approach 2) will allow us to use different versions of a class in the >same code, and might solve some dependency conflict issues. Also in 2), > we >can load Jars on demand, while in 1) all jars will be loaded by default. > > I think we can support both interfaces. On the SQL DDL implementation, both > will work and approach 2) will be more complicated, but with some nice > benefit as stated above. However, the implementation choice should be > transparent to the end user. Also, I am wondering outside of the SQL DDL, > will these new functionality/interface be helpful in other scenarios? > Maybe, that will help make the interface better and more generic. Thanks a > lot. > > Shuyi > > On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske wrote: > > > We could also offer a feature that users can request classloaders with > > additional jars. > > This could work as follows: > > > > 1) Users register jar files in the ExecutionEnvironment (similar to > cached > > files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName"); > > 2) In a function, the user can request a user classloader with the > > additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName"); > > This could also support to load multiple jar files in the same > classloader. > > > > IMO, the interesting part of Shuyi's proposal is to be able to > dynamically > > load code from remote locations without fetching it to the client first. > > > > Best, Fabian > > > > > > 2018-05-29 12:42 GMT+02:00 Till Rohrmann : > > > > > I see Shuyi's point that it would nice to allow adding jar files which > > > should be part of the user code classloader programmatically. Actually, > > we > > > expose this functionality in the `RemoteEnvironment` where you can > > specify > > > additional jars which shall be shipped to the cluster in the > > constructor. I > > > assume that is exactly the functionality you are looking for. In that > > > sense, it might be an API inconsistency that we allow it for some cases > > and > > > for others not. > > > > > > But I could also see that the whole functionality of dynamically > loading > > > jars at runtime could also perfectly live in the `UdfSqlOperator`. > This, > > of > > > course, would entail that one has to take care of clean up of the > > > downloaded resources. But it should be possible to first download the > > > resources and create a custom URLClassLoader at startup and then use > this > > > class loader when calling into the UDF. > > > > > > Cheers, > > > Till > > > > > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen > wrote: > > > > > > > Hi Aljoscha, Fabian, Rong, Ted and Timo, > > > > > > > > Thanks a lot for the feedback. Let me clarify the usage scenario in a > > bit > > > > more detail. The context is that we want to add support for SQL DDL > to > > > load > > > > UDF from external JARs located either in local filesystem or HDFS or > a > > > HTTP > > > > endpoint in Flink SQL. The local FS option is more for debugging > > purpose > > > > for user to submit the job jar locally, and the later 2 are for > > > production > > > > uses. Below is an example User application with the *CREATE FUNCTION* > > DDL > > > > (Note: grammar and interface not finalized yet). > > > > > > > > > > > > - > > > > > > > > > > > > > > > > > > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval > tEnv = > > > > TableEnvironment.getTableEnvironment(env)// setup the > > > DataStream//..* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *// register the DataStream under the name > > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product, > > > > 'amount)tEnv.s
Re: [1.4.2] mvn clean package command takes to much time
> > Thanks for the help! >
Re: [1.4.2] mvn clean package command takes to much time
Thank you for your reply. If I modify the flink-runtime module, then the following command is executed for compilation. 'mvn clean package -pl flink-runtime,flink-dist *-am*' The parameter of '-am' is necessary, it will take a long time, otherwise it will report an error, like 'Failed to execute goal on project flink-dist_2.11: Could not resolve dependencies for project org.apache.flink:flink-dist_2.11:jar:1.4.2: Could not find artifact org.apache.flink:flink-shaded-hadoop2-uber:jar:1.4.2' Am i missing something, waiting for your reply. Best regard. Chesnay Schepler 于2018年6月6日周三 下午4:29写道: > you only have to compile the module that you changed along with > flink-dist to test things locally. > > On 06.06.2018 10:27, Marvin777 wrote: > > Hi, all. > > It takes a long time to modify some of the code and recompile it. The > > process is painful. > > Is there any method that I can save time. > > > > Thanks! > > > >
[1.4.2] mvn clean package command takes to much time
Hi, all. It takes a long time to modify some of the code and recompile it. The process is painful. Is there any method that I can save time. Thanks!
Re: [jira] [Created] (FLINK-7608) LatencyGauge change to histogram metric
I had an offline discussion with @zentol, he said: "I hope to get this change in for 1.5; the core implementation is done but some details must be ironed out first.". Best. 2018-01-03 11:25 GMT+08:00 Marvin777 : > Hi, all: > > I have some question about LatencyGauge change to histogram metric. > Whether such a scheme is feasible? > > I want to know the latest progress on the question of FLINK-7608. > > @zentol, you suggested that we should delay merging this PR by a week or > 2, and now What should I do in my version 1.3.1. > > Sorry to disturb, Regards, > > thanks all. > > 2017-09-09 4:36 GMT+08:00 Hai Zhou (JIRA) : > >> Hai Zhou created FLINK-7608: >> --- >> >> Summary: LatencyGauge change to histogram metric >> Key: FLINK-7608 >> URL: https://issues.apache.org/jira/browse/FLINK-7608 >> Project: Flink >> Issue Type: Bug >> Components: Metrics >> Reporter: Hai Zhou >> Assignee: Hai Zhou >> Fix For: 1.4.0, 1.3.3 >> >> >> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] >> to export metrics the log file. >> I found: >> >> >> {noformat} >> -- Gauges >> - >> .. >> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming >> Job.Map.0.latency: value={LatencySourceDescriptor{vertexID=1, >> subtaskIndex=-1}={p99=116.0, p50=59.5, min=11.0, max=116.0, p95=116.0, >> mean=61.836}} >> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming >> Job.Sink- Unnamed.0.latency: value={LatencySourceDescriptor{vertexID=1, >> subtaskIndex=0}={p99=195.0, p50=163.5, min=115.0, max=195.0, p95=195.0, >> mean=161.0}} >> .. >> {noformat} >> >> >> >> >> >> -- >> This message was sent by Atlassian JIRA >> (v6.4.14#64029) >> > >
Re: [jira] [Created] (FLINK-7608) LatencyGauge change to histogram metric
Hi, all: I have some question about LatencyGauge change to histogram metric. Whether such a scheme is feasible? I want to know the latest progress on the question of FLINK-7608. @zentol, you suggested that we should delay merging this PR by a week or 2, and now What should I do in my version 1.3.1. Sorry to disturb, Regards, thanks all. 2017-09-09 4:36 GMT+08:00 Hai Zhou (JIRA) : > Hai Zhou created FLINK-7608: > --- > > Summary: LatencyGauge change to histogram metric > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics > Reporter: Hai Zhou > Assignee: Hai Zhou > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] > to export metrics the log file. > I found: > > > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: value={LatencySourceDescriptor{vertexID=1, > subtaskIndex=-1}={p99=116.0, p50=59.5, min=11.0, max=116.0, p95=116.0, > mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: value={LatencySourceDescriptor{vertexID=1, > subtaskIndex=0}={p99=195.0, p50=163.5, min=115.0, max=195.0, p95=195.0, > mean=161.0}} > .. > {noformat} > > > > > > -- > This message was sent by Atlassian JIRA > (v6.4.14#64029) >
Re: Remove the HDFS directory in org.apache.flink.util.FileUtils.deletePathIfEmpty
That has been solved, Because of the hadoop version issue. Thanks. 2017-11-08 17:54 GMT+08:00 Chesnay Schepler : > For me they showed in user mailing list, but not in dev. (or maybe the > reverse, not quite sure...) > > On 08.11.2017 10:47, Aljoscha Krettek wrote: > >> Hi, >> >> You images did not make it through to the mailing list. >> >> Best, >> Aljoscha >> >> On 8. Nov 2017, at 05:25, 马庆祥 wrote: >>> >>> Hi,all, >>> >>> I enable checkpoint with the configuration in the below figure . >>> >>> >>> it works, but keep getting the below exception: >>> >>> >>> I want to know if the below commit is to resolve the above problem, but >>> the exception still appears. >>> [hotfix] [core] Fix FileUtils.deletePathIfEmpty >>> >>> Flink version: 1.3.1 >>> Hadoop version: 1.x >>> >>> thanks~ >>> >> >> >