My pipeline utilizes the Combine.perKey transform and I would like to add 
withHotKeyFanout to prevent the combine from being a bottleneck. To test I made 
sure that my mergeAccumulators function was correct and added 
withHotKeyFanout(2) to my Combine transform.
When I launch the pipeline with flink(v1.3.2) the pipeline only lasts for a 
minute or so until I am greeted with this stacktrace:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    ...7 more
Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
    at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74)
    at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758)
    at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496)
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    ... 15 more
Caused by: org.apache.avro.AvroRuntimeException: Array data must be a 
Collection or Array
    at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:70)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
    at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
    at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:156)
    at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:64)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:879)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:856)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87)
    at 
org.apache.beam.runners.core.ReduceFnRunner$2.output(ReduceFnRunner.java:1061)
    at 
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:428)
    at 
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:124)
    at 
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1066)
    at 
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:779)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:134)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
    at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
    at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
    at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74)
    at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758)
    at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527)
    at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496)
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:745)

Is this occurring because a key only has one value associated with it?


This e-mail message and any attachments to it are intended only for the named 
recipients and may contain legally privileged and/or confidential information. 
If you are not one of the intended recipients, do not duplicate or forward this 
e-mail message.

Reply via email to