Hi David,

thank you for reporting the issue. I'll look into it. In the meantime, you
can set "flink.disable-metrics" to "true" in the properties. This way, you
disable the metrics.
I'll probably have to introduce something like a client id to differentiate
between the producers.

Robert

On Thu, Jan 21, 2016 at 11:51 PM, David Kim <david....@braintreepayments.com
> wrote:

> Hi Robert!
>
> Thanks for reaching out. I ran into an issue and wasn't sure if this was
> due to a misconfiguration on my end of if this is a real bug. I have one
> DataStream and I'm sinking to two different kafka sinks. When the job
> starts, I run into this error:
>
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.UnsupportedOperationException: The accumulator
> 'producer-record-retry-rate' already exists and cannot be added.
> at
> org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.addAccumulator(AbstractRuntimeUDFContext.java:121)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:204)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
> at java.lang.Thread.run(Thread.java:745)
>
>
> The particular accumulator the exception is complains about changes,
> meaning it's not always 'producer-record-retry-rate' -- most likely due to
> the non-deterministic ordering of the collection. Any guidance appreciated!
>
> I'm using 1.0-SNAPSHOT and my two sinks are FlinkKafkaProducer08.
>
> The flink code looks something like this:
>
>
> val stream: DataStream[Foo] = ...
>
> val kafkaA = new FlinkKafkaProducer08[Foo]...
>
> val kafkaB = new FlinkKafkaProducer08[Foo]...
>
>
> stream
>   .addSink(kafkaA)
>
> stream.
>   .addSink(kafkaB)
>
>
> Thanks,
> David
>
> On Wed, Jan 20, 2016 at 1:34 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> I've now merged the pull request. DeserializationSchema.isEndOfStream()
>> should now be evaluated correctly by the Kafka 0.9 and 0.8 connectors.
>>
>> Please let me know if the updated code has any issues. I'll fix the
>> issues asap.
>>
>> On Wed, Jan 13, 2016 at 5:06 PM, David Kim <
>> david....@braintreepayments.com> wrote:
>>
>>> Thanks Robert! I'll be keeping tabs on the PR.
>>>
>>> Cheers,
>>> David
>>>
>>> On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger <metrob...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> In theory isEndOfStream() is absolutely the right way to go for
>>>> stopping data sources in Flink.
>>>> That its not working as expected is a bug. I have a pending pull
>>>> request for adding a Kafka 0.9 connector, which fixes this issue as well
>>>> (for all supported Kafka versions).
>>>>
>>>> Sorry for the inconvenience. If you want, you can check out the branch
>>>> of the PR and build Flink yourself to get the fix.
>>>> I hope that I can merge the connector to master this week, then, the
>>>> fix will be available in 1.0-SNAPSHOT as well.
>>>>
>>>> Regards,
>>>> Robert
>>>>
>>>>
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On 11.01.2016, at 21:39, David Kim <david....@braintreepayments.com>
>>>> wrote:
>>>>
>>>> Hello all,
>>>>
>>>> I saw that DeserializationSchema has an API "isEndOfStream()".
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
>>>>
>>>> Can *isEndOfStream* be utilized to somehow terminate a streaming flink
>>>> job?
>>>>
>>>> I was under the impression that if we return "true" we can control when
>>>> a stream can close. The use case I had in mind was controlling when
>>>> unit/integration tests would terminate a flink job. We can rely on the fact
>>>> that a test/spec would know how many items it expects to consume and then
>>>> switch *isEndOfStream* to return true.
>>>>
>>>> Am I misunderstanding the intention for *isEndOfStream*?
>>>>
>>>> I also set a breakpoint on *isEndOfStream* and saw that it never was
>>>> hit when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema
>>>> implementation.
>>>>
>>>> Currently testing on 1.0-SNAPSHOT.
>>>>
>>>> Cheers!
>>>> David
>>>>
>>>>
>>>
>>>
>>> --
>>> Note: this information is confidential. It is prohibited to share, post
>>> online or otherwise publicize without Braintree's prior written consent.
>>>
>>
>>
>
>
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>

Reply via email to