My pipeline utilizes the Combine.perKey transform and I would like to add withHotKeyFanout to prevent the combine from being a bottleneck. To test I made sure that my mergeAccumulators function was correct and added withHotKeyFanout(2) to my Combine transform. When I launch the pipeline with flink(v1.3.2) the pipeline only lasts for a minute or so until I am greeted with this stacktrace:
java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) ...7 more Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) ... 15 more Caused by: org.apache.avro.AvroRuntimeException: Array data must be a Collection or Array at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:70) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:156) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:879) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:856) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87) at org.apache.beam.runners.core.ReduceFnRunner$2.output(ReduceFnRunner.java:1061) at org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:428) at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:124) at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1066) at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:779) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:134) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:74) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:113) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:758) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:527) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:496) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Is this occurring because a key only has one value associated with it? This e-mail message and any attachments to it are intended only for the named recipients and may contain legally privileged and/or confidential information. If you are not one of the intended recipients, do not duplicate or forward this e-mail message.