Hi David, thank you for reporting the issue. I'll look into it. In the meantime, you can set "flink.disable-metrics" to "true" in the properties. This way, you disable the metrics. I'll probably have to introduce something like a client id to differentiate between the producers.
Robert On Thu, Jan 21, 2016 at 11:51 PM, David Kim <david....@braintreepayments.com > wrote: > Hi Robert! > > Thanks for reaching out. I ran into an issue and wasn't sure if this was > due to a misconfiguration on my end of if this is a real bug. I have one > DataStream and I'm sinking to two different kafka sinks. When the job > starts, I run into this error: > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.UnsupportedOperationException: The accumulator > 'producer-record-retry-rate' already exists and cannot be added. > at > org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:745) > > > The particular accumulator the exception is complains about changes, > meaning it's not always 'producer-record-retry-rate' -- most likely due to > the non-deterministic ordering of the collection. Any guidance appreciated! > > I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08. > > The flink code looks something like this: > > > val stream: DataStream[Foo] = ... > > val kafkaA = new FlinkKafkaProducer08[Foo]... > > val kafkaB = new FlinkKafkaProducer08[Foo]... > > > stream > .addSink(kafkaA) > > stream. > .addSink(kafkaB) > > > Thanks, > David > > On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> I've now merged the pull request. DeserializationSchema.isEndOfStream() >> should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors. >> >> Please let me know if the updated code has any issues. I'll fix the >> issues asap. >> >> On Wed, Jan 13, 2016 at 5:06 PM, David Kim < >> david....@braintreepayments.com> wrote: >> >>> Thanks Robert! I'll be keeping tabs on the PR. >>> >>> Cheers, >>> David >>> >>> On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <metrob...@gmail.com> >>> wrote: >>> >>>> Hi David, >>>> >>>> In theory isEndOfStream() is absolutely the right way to go for >>>> stopping data sources in Flink. >>>> That its not working as expected is a bug. I have a pending pull >>>> request for adding a Kafka 0.9 connector, which fixes this issue as well >>>> (for all supported Kafka versions). >>>> >>>> Sorry for the inconvenience. If you want, you can check out the branch >>>> of the PR and build Flink yourself to get the fix. >>>> I hope that I can merge the connector to master this week, then, the >>>> fix will be available in 1.0-SNAPSHOT as well. >>>> >>>> Regards, >>>> Robert >>>> >>>> >>>> >>>> Sent from my iPhone >>>> >>>> On 11.01.2016, at 21:39, David Kim <david....@braintreepayments.com> >>>> wrote: >>>> >>>> Hello all, >>>> >>>> I saw that DeserializationSchema has an API "isEndOfStream()". >>>> >>>> >>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java >>>> >>>> Can *isEndOfStream* be utilized to somehow terminate a streaming flink >>>> job? >>>> >>>> I was under the impression that if we return "true" we can control when >>>> a stream can close. The use case I had in mind was controlling when >>>> unit/integration tests would terminate a flink job. We can rely on the fact >>>> that a test/spec would know how many items it expects to consume and then >>>> switch *isEndOfStream* to return true. >>>> >>>> Am I misunderstanding the intention for *isEndOfStream*? >>>> >>>> I also set a breakpoint on *isEndOfStream* and saw that it never was >>>> hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema >>>> implementation. >>>> >>>> Currently testing on 1.0-SNAPSHOT. >>>> >>>> Cheers! >>>> David >>>> >>>> >>> >>> >>> -- >>> Note: this information is confidential. It is prohibited to share, post >>> online or otherwise publicize without Braintree's prior written consent. >>> >> >> > > > -- > Note: this information is confidential. It is prohibited to share, post > online or otherwise publicize without Braintree's prior written consent. >