Re: Storing offsets in Kafka

2020-01-13 Thread Marvin777
Hi jiangjie,

Yeah I am using the second case.  (Flink 1.7.1, Kafka
0.10.2, FlinkKafkaConsumer010)
But now there is a problem, the data is consumed normally, but the commit
offset is not continued. The following exception is found:

[image: image.png]



Becket Qin  于2019年9月5日周四 上午11:32写道:

> Hi Dominik,
>
> There has not been any change to the offset committing logic in
> KafkaConsumer for a while. But the logic is a little complicated. The
> offset commit to Kafka is only enabled in the following two cases:
>
> 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
> (default value is true)
> 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
> enable.auto.commit=true (default value is true); b)
> auto.commit.interval.ms>0
> (default value is 5000).
>
> Note that in case 1, if the job exits before the first checkpoint takes
> place, then there will be no offset committed.
>
> Can you check if your setting falls in one of the two cases?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński  wrote:
>
> > Hey,
> > I was wondering whether something has changed for KafkaConsumer, since I
> am
> > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> > seems to be no change in the topic where Kafka stores it's offsets, after
> > restart Flink uses the `auto.offset.reset` so it seems that there is no
> > offsets commit happening. The checkpoints are properly configured and I
> am
> > able to restore with Savepoint. But the group offsets are not working
> > properly. It there anything that has changed in this manner ?
> >
> > Best Regards,
> > Dom.
> >
>


Re: [DISCUSS] Adding new interfaces in [Stream]ExecutionEnvironment

2018-06-25 Thread Marvin777
Hi, Shuyi:

What is the progress of the discussion?  We also look forward to this
feature.
Thanks.

Shuyi Chen  于2018年6月8日周五 下午3:04写道:

> Thanks a lot for the comments, Till and Fabian.
>
> The RemoteEnvrionment does provide a way to specify jar files at
> construction, but we want the jar files to be specified dynamically in the
> user code, e.g. in a DDL statement, and the jar files might be in a remote
> DFS. As we discussed, I think there are 2 approaches:
>
> 1) add new interface env.registerJarFile(jarFiles...), which ships the JAR
> files using JobGraph.addJar(). In this case, all jars will be loaded by
> default at runtime. This approach will be the same as how SQL client ship
> UDF jars now.
> 2) add new interface env.registerJarFile(name, jarFiles...). It will do
> similar things as env.registerCachedFile(), which will register a set of
> Jar files with a key name, and we can add a new interface in
> RuntimeContext as Fabian suggests, i.e.,
> RuntimeContext.getClassloaderWithJar(). Now user will be able to
> load the functions in remote jar dynamically using the returned
> ClassLoader.
>
> Comparing the 2 approaches:
>
>- Approach 1) will be simpler for user to use.
>- Approach 2) will allow us to use different versions of a class in the
>same code, and might solve some dependency conflict issues. Also in 2),
> we
>can load Jars on demand, while in 1) all jars will be loaded by default.
>
> I think we can support both interfaces. On the SQL DDL implementation, both
> will work and approach 2) will be more complicated, but with some nice
> benefit as stated above. However, the implementation choice should be
> transparent to the end user. Also, I am wondering outside of the SQL DDL,
> will these new functionality/interface be helpful in other scenarios?
> Maybe, that will help make the interface better and more generic. Thanks a
> lot.
>
> Shuyi
>
> On Tue, Jun 5, 2018 at 1:47 AM Fabian Hueske  wrote:
>
> > We could also offer a feature that users can request classloaders with
> > additional jars.
> > This could work as follows:
> >
> > 1) Users register jar files in the ExecutionEnvironment (similar to
> cached
> > files) with a name, e.g., env.registerJarFile("~/myJar.jar", "myName");
> > 2) In a function, the user can request a user classloader with the
> > additional classes, e.g., RuntimeContext.getClassloaderWithJar("myName");
> > This could also support to load multiple jar files in the same
> classloader.
> >
> > IMO, the interesting part of Shuyi's proposal is to be able to
> dynamically
> > load code from remote locations without fetching it to the client first.
> >
> > Best, Fabian
> >
> >
> > 2018-05-29 12:42 GMT+02:00 Till Rohrmann :
> >
> > > I see Shuyi's point that it would nice to allow adding jar files which
> > > should be part of the user code classloader programmatically. Actually,
> > we
> > > expose this functionality in the `RemoteEnvironment` where you can
> > specify
> > > additional jars which shall be shipped to the cluster in the
> > constructor. I
> > > assume that is exactly the functionality you are looking for. In that
> > > sense, it might be an API inconsistency that we allow it for some cases
> > and
> > > for others not.
> > >
> > > But I could also see that the whole functionality of dynamically
> loading
> > > jars at runtime could also perfectly live in the `UdfSqlOperator`.
> This,
> > of
> > > course, would entail that one has to take care of clean up of the
> > > downloaded resources. But it should be possible to first download the
> > > resources and create a custom URLClassLoader at startup and then use
> this
> > > class loader when calling into the UDF.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, May 16, 2018 at 9:28 PM, Shuyi Chen 
> wrote:
> > >
> > > > Hi Aljoscha, Fabian, Rong, Ted and Timo,
> > > >
> > > > Thanks a lot for the feedback. Let me clarify the usage scenario in a
> > bit
> > > > more detail. The context is that we want to add support for SQL DDL
> to
> > > load
> > > > UDF from external JARs located either in local filesystem or HDFS or
> a
> > > HTTP
> > > > endpoint in Flink SQL. The local FS option is more for debugging
> > purpose
> > > > for user to submit the job jar locally, and the later 2 are for
> > > production
> > > > uses. Below is an example User application with the *CREATE FUNCTION*
> > DDL
> > > > (Note: grammar and interface not finalized yet).
> > > >
> > > > 
> > > > -
> > > >
> > > >
> > > >
> > > >
> > > > *val env = StreamExecutionEnvironment.getExecutionEnvironmentval
> tEnv =
> > > > TableEnvironment.getTableEnvironment(env)// setup the
> > > DataStream//..*
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *// register the DataStream under the name
> > > > "OrderAtEnv.registerDataStream("OrderA", orderA, 'user, 'product,
> > > > 'amount)tEnv.s

Re: [1.4.2] mvn clean package command takes to much time

2018-06-07 Thread Marvin777
>
> Thanks for the help!
>


Re: [1.4.2] mvn clean package command takes to much time

2018-06-06 Thread Marvin777
Thank you for your reply. If I modify the flink-runtime module,  then the
following command is executed for compilation.

'mvn clean package -pl flink-runtime,flink-dist *-am*'

The parameter of '-am' is necessary,  it will take a long time, otherwise
it will report an error, like 'Failed to execute goal on project
flink-dist_2.11: Could not resolve dependencies for project
org.apache.flink:flink-dist_2.11:jar:1.4.2: Could not find artifact
org.apache.flink:flink-shaded-hadoop2-uber:jar:1.4.2'

Am i missing something, waiting for your reply.

Best regard.


Chesnay Schepler  于2018年6月6日周三 下午4:29写道:

> you only have to compile the module that you changed along with
> flink-dist to test things locally.
>
> On 06.06.2018 10:27, Marvin777 wrote:
> > Hi, all.
> > It takes a long time to modify some of the code and recompile it. The
> > process is painful.
> > Is there any method that I can save time.
> >
> > Thanks!
> >
>
>


[1.4.2] mvn clean package command takes to much time

2018-06-06 Thread Marvin777
Hi, all.
It takes a long time to modify some of the code and recompile it. The
process is painful.
Is there any method that I can save time.

Thanks!


Re: [jira] [Created] (FLINK-7608) LatencyGauge change to histogram metric

2018-01-09 Thread Marvin777
I had an offline discussion with @zentol, he said:

"I hope to get this change in for 1.5; the core implementation is done but
some details must be ironed out first.".

Best.

2018-01-03 11:25 GMT+08:00 Marvin777 :

> Hi, all:
>
> I have some question about LatencyGauge change to histogram metric.
> Whether such a scheme is feasible?
>
> I want to know the latest progress on the question of FLINK-7608.
>
> @zentol, you suggested that we should delay merging this PR by a week or
> 2, and now What should I do in my version 1.3.1.
>
> Sorry to disturb, Regards,
>
> thanks all.
>
> 2017-09-09 4:36 GMT+08:00 Hai Zhou (JIRA) :
>
>> Hai Zhou created FLINK-7608:
>> ---
>>
>>  Summary: LatencyGauge change to  histogram metric
>>  Key: FLINK-7608
>>  URL: https://issues.apache.org/jira/browse/FLINK-7608
>>  Project: Flink
>>   Issue Type: Bug
>>   Components: Metrics
>> Reporter: Hai Zhou
>> Assignee: Hai Zhou
>>  Fix For: 1.4.0, 1.3.3
>>
>>
>> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]
>> to export metrics the log file.
>> I found:
>>
>>
>> {noformat}
>> -- Gauges 
>> -
>> ..
>> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming
>> Job.Map.0.latency: value={LatencySourceDescriptor{vertexID=1,
>> subtaskIndex=-1}={p99=116.0, p50=59.5, min=11.0, max=116.0, p95=116.0,
>> mean=61.836}}
>> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming
>> Job.Sink- Unnamed.0.latency: value={LatencySourceDescriptor{vertexID=1,
>> subtaskIndex=0}={p99=195.0, p50=163.5, min=115.0, max=195.0, p95=195.0,
>> mean=161.0}}
>> ..
>> {noformat}
>>
>>
>>
>>
>>
>> --
>> This message was sent by Atlassian JIRA
>> (v6.4.14#64029)
>>
>
>


Re: [jira] [Created] (FLINK-7608) LatencyGauge change to histogram metric

2018-01-02 Thread Marvin777
Hi, all:

I have some question about LatencyGauge change to histogram metric. Whether
such a scheme is feasible?

I want to know the latest progress on the question of FLINK-7608.

@zentol, you suggested that we should delay merging this PR by a week or 2,
and now What should I do in my version 1.3.1.

Sorry to disturb, Regards,

thanks all.

2017-09-09 4:36 GMT+08:00 Hai Zhou (JIRA) :

> Hai Zhou created FLINK-7608:
> ---
>
>  Summary: LatencyGauge change to  histogram metric
>  Key: FLINK-7608
>  URL: https://issues.apache.org/jira/browse/FLINK-7608
>  Project: Flink
>   Issue Type: Bug
>   Components: Metrics
> Reporter: Hai Zhou
> Assignee: Hai Zhou
>  Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]
> to export metrics the log file.
> I found:
>
>
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming
> Job.Map.0.latency: value={LatencySourceDescriptor{vertexID=1,
> subtaskIndex=-1}={p99=116.0, p50=59.5, min=11.0, max=116.0, p95=116.0,
> mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming
> Job.Sink- Unnamed.0.latency: value={LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=195.0, p50=163.5, min=115.0, max=195.0, p95=195.0,
> mean=161.0}}
> ..
> {noformat}
>
>
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.4.14#64029)
>


Re: Remove the HDFS directory in org.apache.flink.util.FileUtils.deletePathIfEmpty

2017-11-08 Thread Marvin777
That has been solved, Because of the hadoop version issue.
Thanks.

2017-11-08 17:54 GMT+08:00 Chesnay Schepler :

> For me they showed in user mailing list, but not in dev. (or maybe the
> reverse, not quite sure...)
>
> On 08.11.2017 10:47, Aljoscha Krettek wrote:
>
>> Hi,
>>
>> You images did not make it through to the mailing list.
>>
>> Best,
>> Aljoscha
>>
>> On 8. Nov 2017, at 05:25, 马庆祥  wrote:
>>>
>>> Hi,all,
>>>
>>> I enable checkpoint with the configuration in the below figure .
>>>
>>>
>>> it works, but keep getting the below exception:
>>>
>>>
>>> I want to know if the below commit  is to resolve the above problem, but
>>> the exception still appears.
>>> [hotfix] [core] Fix FileUtils.deletePathIfEmpty
>>>
>>> Flink version: 1.3.1
>>> Hadoop version: 1.x
>>>
>>> thanks~
>>>
>>
>>
>