The pipeline code did not change and looks like the following:
pipeline
.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
Window.into(new ZurichTimePartitioningWindowFn())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency))))
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
.apply(
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
KafkaToBigQuery::convertUserEventToTableRow)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
On Thu, Mar 28, 2019 at 5:13 PM Kaymak, Tobias <[email protected]>
wrote:
> Hello,
>
> I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and from
> Beam 2.10 to 2.11 and I am seeing this error when starting my pipelines:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.UnsupportedOperationException: The transform
> beam:transform:create_view:v1 is currently not supported.
>
>
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>
>
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)
>
>
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)
>
>
> at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> at
> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
> I found this open issue while googling
> https://jira.apache.org/jira/browse/BEAM-4301 - but it seems unrelated,
> what makes me wonder is the type of error message I am seeing.
> I tried Flink 1.7.2 with Scala 2.11 + 2.12 without luck.
> I tried deleting all state information of Flink (ha/ and snapshots/), in
> the end I tried downgrading to Beam 2.10. - And that worked.
> Could it be that there is a bug that has been introduced in 2.11?
>
> Best,
> Tobi
>
>
>