Re: Starting Flink cluster and running a job

2019-02-20 Thread Konstantin Knauf
Hi Boris,

can you the relevant parts (dependencies) of your pom.xml? Did you also try
without fixing the Kafka version, i.e. running with the Kafka client
version provided by the Kafka connector of Flink? Gordon (cc) dealt with
FLINK-8741.

@Gordon: have you seen this issue with 1.6/1.7 before?

Cheers,

Konstantin

On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> I found some more details on this
> The same error for the same application was reported about a year ago
> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E
> And was due to https://issues.apache.org/jira/browse/FLINK-8741
>
> It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both
> latest kaffka-connector
> And Kafka-connector-011
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Feb 19, 2019, at 7:02 PM, Ken Krugler 
> wrote:
>
> Hi Boris,
>
> I haven’t seen this exact error, but I have seen similar errors caused by
> multiple versions of jars on the classpath.
>
> When I’ve run into this particular "XXX is not an instance of YYY"
> problem, it often seems to be caused by a jar that I should have marked as
> provided in my pom.
>
> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this
> doesn’t apply.
>
> — Ken
>
> PS - I assume you’ve been reading
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>
>
> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Konstantin,
> After experimenting with this for a while, I got to the root cause of the
> problem
> I am running a version of a Taxi ride travel prediction as my sample.
> It works fine in Intellij,
> But when I am trying to put it in the docker (standard Debian 1.7 image)
> It fails with a following error
>
>
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at
> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
> at
> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 19 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct
> kafka producer
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416)
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
> at
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:116)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:696)
> at
> 

Re: How to debug difference between Kinesis and Kafka

2019-02-20 Thread Congxian Qiu
Hi Stephen

If the window has not been triggered ever, maybe you could investigate the 
watermark, maybe the doc[1][2] can be helpful.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks

Best, Congxian
On Feb 19, 2019, 21:31 +0800, Stephen Connolly 
, wrote:
> Hmmm my suspicions are now quite high. I created a file source that just 
> replays the events straight then I get more results
>
> > On Tue, 19 Feb 2019 at 11:50, Stephen Connolly 
> >  wrote:
> > > Hmmm after expanding the dataset such that there was additional data that 
> > > ended up on shard-0 (everything in my original dataset was coincidentally 
> > > landing on shard-1) I am now getting output... should I expect this kind 
> > > of behaviour if no data arrives at shard-0 ever?
> > >
> > > > On Tue, 19 Feb 2019 at 11:14, Stephen Connolly 
> > > >  wrote:
> > > > > Hi, I’m having a strange situation and I would like to know where I 
> > > > > should start trying to debug.
> > > > >
> > > > > I have set up a configurable swap in source, with three 
> > > > > implementations:
> > > > >
> > > > > 1. A mock implementation
> > > > > 2. A Kafka consumer implementation
> > > > > 3. A Kinesis consumer implementation
> > > > >
> > > > > From injecting a log and no-op map function I can see that all three 
> > > > > sources pass through the events correctly.
> > > > >
> > > > > I then have a window based on event time stamps… and from inspecting 
> > > > > the aggregation function I can see that the data is getting 
> > > > > aggregated…, I’m using the 
> > > > > `.aggregate(AggregateFunction.WindowFunction)` variant so that I can 
> > > > > retrieve the key
> > > > >
> > > > > Here’s the strange thing, I only change the source (and each source 
> > > > > uses the same deserialization function) but:
> > > > >
> > > > >
> > > > > • When I use either Kafka or my Mock source, the WindowFunction gets 
> > > > > called as events pass the end of the window
> > > > > • When I use the Kinesis source, however, the window function never 
> > > > > gets called. I have even tried injecting events into kinesis with 
> > > > > really high timestamps to flush the watermarks in my 
> > > > > BoundedOutOfOrdernessTimestampExtractor... but nothing
> > > > >
> > > > > I cannot see how this source switching could result in such a 
> > > > > different behaviour:
> > > > >
> > > > >         Properties sourceProperties = new Properties();
> > > > >         ConsumerFactory sourceFactory;
> > > > >         String sourceName = configParams.getRequired("source");
> > > > >         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> > > > >             case "kinesis":
> > > > >                 sourceFactory = FlinkKinesisConsumer::new;
> > > > >                 copyOptionalArg(configParams, "aws-region", 
> > > > > sourceProperties, AWSConfigConstants.AWS_REGION);
> > > > >                 copyOptionalArg(configParams, "aws-endpoint", 
> > > > > sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> > > > >                 copyOptionalArg(configParams, "aws-access-key", 
> > > > > sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> > > > >                 copyOptionalArg(configParams, "aws-secret-key", 
> > > > > sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> > > > >                 copyOptionalArg(configParams, "aws-profile", 
> > > > > sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> > > > >                 break;
> > > > >             case "kafka":
> > > > >                 sourceFactory = FlinkKafkaConsumer010::new;
> > > > >                 copyRequiredArg(configParams, "bootstrap-server", 
> > > > > sourceProperties, "bootstrap.servers");
> > > > >                 copyOptionalArg(configParams, "group-id", 
> > > > > sourceProperties, "group.id");
> > > > >                 break;
> > > > >             case "mock":
> > > > >                 sourceFactory = MockSourceFunction::new;
> > > > >                 break;
> > > > >             default:
> > > > >                 throw new RuntimeException("Unknown source '" + 
> > > > > sourceName + '\'');
> > > > >         }
> > > > >
> > > > >         // set up the streaming execution environment
> > > > >         final StreamExecutionEnvironment env = 
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >
> > > > >         // poll watermark every second because using 
> > > > > BoundedOutOfOrdernessTimestampExtractor
> > > > >         env.getConfig().setAutoWatermarkInterval(1000L);
> > > > >         env.enableCheckpointing(5000);
> > > > >
> > > > >         SplitStream eventsByType = 
> > > > > env.addSource(sourceFactory.create(
> > > > >                 configParams.getRequired("topic"),
> > > > >                 new ObjectNodeDeserializationSchema(),
> > > > >                 

Re: Starting Flink cluster and running a job

2019-02-20 Thread Boris Lublinsky
I found some more details on this
The same error for the same application was reported about a year ago 
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E
 

And was due to https://issues.apache.org/jira/browse/FLINK-8741 


It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both 
latest kaffka-connector
And Kafka-connector-011

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 19, 2019, at 7:02 PM, Ken Krugler  wrote:
> 
> Hi Boris,
> 
> I haven’t seen this exact error, but I have seen similar errors caused by 
> multiple versions of jars on the classpath.
> 
> When I’ve run into this particular "XXX is not an instance of YYY" problem, 
> it often seems to be caused by a jar that I should have marked as provided in 
> my pom.
> 
> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this 
> doesn’t apply.
> 
> — Ken
> 
> PS - I assume you’ve been reading 
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>  
> 
> 
> 
>> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky > > wrote:
>> 
>> Konstantin,
>> After experimenting with this for a while, I got to the root cause of the 
>> problem
>> I am running a version of a Taxi ride travel prediction as my sample.
>> It works fine in Intellij,
>> But when I am trying to put it in the docker (standard Debian 1.7 image)
>> It fails with a following error
>> 
>> 
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
>> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
>>  at 
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>  at 
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>  at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>  at 
>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
>>  at 
>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>  at 
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
>> execution failed.
>>  at 
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at 
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>>  ... 19 more
>> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
>> producer
>>  at 
>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416)
>>  at 
>> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:116)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:944)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:940)
>>  at 
>> 

Re: How to use my custom log4j.properties when running minicluster in idea

2019-02-20 Thread Yun Tang
Hi Peibin

I believe you could refer to the directory structure of 
flink/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java,
 and the its log4j.properties file located at 
flink/flink-examples/flink-examples-streaming/src/main/resources/log4j.properties
 .

You could compare your directory structure and the file contents with those I 
mentioned above and I believe you could finally make it work.

Best
Yun Tang

From: peibin wang 
Sent: Tuesday, February 19, 2019 12:04
To: user@flink.apache.org
Subject: How to use my custom log4j.properties when running minicluster in idea


Hi,

   I am running flink job in the Intellij IDEA  with mini cluster (not 
submit it to the flink cluster ) for  convenience .

Now I have put my custom log config file ( both log4j.properties and 
logback.xml)  in src/main/resources/. But it does not work. Is there any 
solutions?


Re: Jira issue Flink-11127

2019-02-20 Thread Boris Lublinsky
Also, The suggested workaround does not quite work.
2019-02-20 15:27:43,928 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is now 
gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by: 
[flink-taskmanager-1: No address associated with hostname]
2019-02-20 15:27:48,750 ERROR 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - 
Caught exception

I think the problem is that its trying to connect to flink-task-manager-1

Using busybody to experiment with nslookup, I can see
/ # nslookup flink-taskmanager-1.flink-taskmanager
Server:10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:  flink-taskmanager-1.flink-taskmanager
Address 1: 10.131.2.136 
flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-1
Server:10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-1'
/ # nslookup flink-taskmanager-0.flink-taskmanager
Server:10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

Name:  flink-taskmanager-0.flink-taskmanager
Address 1: 10.131.0.111 
flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
/ # nslookup flink-taskmanager-0
Server:10.0.11.151
Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal

nslookup: can't resolve 'flink-taskmanager-0'
/ # 

So the name should be postfixed with the service name. How do I force it? I 
suspect I am missing config parameter

 
Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf  
> wrote:
> 
> Hi Boris, 
> 
> the solution is actually simpler than it sounds from the ticket. The only 
> thing you need to do is to set the "taskmanager.host" to the Pod's IP address 
> in the Flink configuration. The easiest way to do this is to pass this config 
> dynamically via a command-line parameter. 
> 
> The Deployment spec could looks something like this:
> containers:
> - name: taskmanager
>   [...]
>   args:
>   - "taskmanager.sh"
>   - "start-foreground"
>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>   [...]
>   env:
>   - name: K8S_POD_IP
> valueFrom:
>   fieldRef:
> fieldPath: status.podIP
> 
> Hope this helps and let me know if this works. 
> 
> Best, 
> 
> Konstantin
> 
> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> I was looking at this issue https://issues.apache.org/jira/browse/FLINK-11127 
> 
> Apparently there is a workaround for it.
> Is it possible provide the complete helm chart for it.
> Bits and pieces are in the ticket, but it would be nice to see the full chart
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
> 
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



Metrics 指标不连续

2019-02-20 Thread Yaoting Gong
大家好:

  我这边采用Flink on Yarn 模式部署。Metrics
收集用的PrometheusPushGateway。发现metric不连续,有的很久都没有给出数据。
我的配置如下:

#==
## Metrics Configuration
#==

metrics.reporter.promgateway.class:
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: 10.201.3.156
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: FlinkJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: true
# metrics.reporter.promgateway.interval: 5 SECONDS

如果有知道的,麻烦告知一下。


Re: Test FileUtilsTest.testDeleteDirectory failed when building Flink

2019-02-20 Thread Chesnay Schepler
I ran into a similar issue when i looked at other CI solutions; imo we 
shouldn't rely on the result of setWritable but instead actually verify 
whether the forbidden operation (i.e. creating/writing files) throws an 
error.


Back then also created a JIRA: 
https://issues.apache.org/jira/browse/FLINK-5970


On 18.02.2019 14:10, Fabian Hueske wrote:

Hi Paul,

Which components (Flink, JDK, Docker base image, ...) are you 
upgrading and which versions do you come from?
I think it would be good to check how (and with which options) the JVM 
in the container is started.


Best, Fabian


Am Fr., 15. Feb. 2019 um 09:50 Uhr schrieb Paul Lam 
mailto:paullin3...@gmail.com>>:


Hi all,

Recently we migrate Flink build to a new docker image, after which
the build job always fails with test errors on local file system
permissions.

For example: FileUtilsTest.testDeleteDirectory:129 this should
fail with an exception.
I notice the following statements inthe javadoc of
`java.io.File.setWritable`:
> On some platforms it may be possible to start the Java virtual
machine with special privileges that allow it to modify files that
disallow write operations.

I think it’s what the test is designed for and where the problem
lies.

Could anyone help me with this? Thanks a lot!

WRT the environment:

- Flink version: 1.7.1
- JDK: open jdk 1.8.0_111
- OS version: debian 8

Best,
Paul Lam





Re: Assigning timestamps and watermarks several times, several datastreams?

2019-02-20 Thread Fabian Hueske
Hi,

Watermarks of streams are independent as long as the streams are not
connected with each other.
When you union, join, or connect two streams in any other way, their
watermarks are fused, which means that they are synced to the "slower"
stream, i.e., the stream with the earlier watermarks.

Best, Fabian

Am Di., 19. Feb. 2019 um 23:34 Uhr schrieb Aakarsh Madhavan <
aakar...@uber.com>:

> Hi!
>
> Currently I am using Flink 1.4.2.
>
> class TSWM implements AssignerWithPunctuatedWatermarks {
>long maxTS = Long.MIN_VALUE;
>@Override
>   public Watermark checkAndGetNextWatermark(POJO event, long l) {
> maxTS = Math.max(maxTS, event.TS);
> return new Watermark(getMaxTimestamp());
>   }
>
>   @Override
>   public long extractTimestamp(POJO event, long l) {
> maxTS = Math.max(maxTS, event.TS);
> return event.TS;
>   }
> }
>
> DataStream ds1 = ... .assignTimestampsAndWatermarks(new TSWM())
>
> DataStream ds2 = ... .assignTimestampsAndWatermarks(new TSWM())
> Suppose I ran this code above, what I am confused about is the overall
> watermarking system.
>
> Now I want to do the following:
>
> ds1.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1),
> Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);
>
> ds2.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1),
> Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction);
>
> The main doubt I am having is how this works with the watermarks. Does
> `ds1` and `ds2` have separate watermarks that don't concern each other? Ie
> do they operate separately?
>
> I am just not sure how the window trigger would work for example or how
> the watermarks would advance. Do they watermarks reset and advance for each
> stream separately so no data is lost?
>
> Thanks!
>


Re:kafka consumer exception

2019-02-20 Thread 董鹏
很感谢,目前我们也是这种思路。
可惜还暂时未解决。
 
 
-- Original --
From:  "ForwardXu";
Date:  Wed, Feb 20, 2019 03:23 PM
To:  "user-zh"; 

Subject:  回复:kafka consumer exception

 
董鹏,你好:


   你这个问题可能多半是你在kafka 
consumer配置中配置了client-id,然后flink多线程执行的时候用的是一样client-id向kafka请求消费数据导致。具体问题你可参看以下jira链接:
https://issues.apache.org/jira/browse/KAFKA-3992。如果是配置了client-id可以去掉留空。这样kafka会为每一个线程从新生成一个clientid,"consumer;
 +  自增id。




前进


-- 原始邮件 --
发件人: "董鹏";
发送时间: 2019年2月20日(星期三) 下午3:02
收件人: "user-zh";

主题: kafka consumer exception



flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)