Hi,
I did test this with 10 partitions per topic, 1 taskmanager
(parallelism=1).
I tested with the 2.10-SNAPSHOT version, and this version was creating
checkpoints with Flink 1.6.2 and Flink 1.7.1.
However, I didn't see any metrics about how many elements per second
were progressed with Flink 1.7.1, but they were shown in Flink 1.6.2.
Best,
Tobias
On Mon, Feb 4, 2019 at 10:56 AM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Tobias, do I understand correctly that checkpointing works fine with
2.10.0? The issues you reported are related to Metrics only?
Juan, it is correct that the bug[1] is caused by a Problem in Flink[2].
The bug occurred when a task had no work assigned, e.g. an empty
partition list when parallelism > #partitions.
The issue is resolved for 2.10.0 and will be fixed in 2.7.1 (LTS).
[1] https://jira.apache.org/jira/browse/BEAM-5386
[2] https://issues.apache.org/jira/browse/FLINK-2491
On 01.02.19 20:21, Juan Carlos Garcia wrote:
> Sorry i was on the phone, the Flink version is 1.5.4.
>
> Am Fr., 1. Feb. 2019, 20:19 hat Juan Carlos Garcia
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
geschrieben:
>
> Hi Tobias
>
> I would like to ask the following and see if this apply to you.
>
> How many kafka partitions you have?
> How many Taskmanagers are you using? (parallelism)
>
> There is bug in Flink, which is triggered as soon as you start
> playing around with a parallelism greater than the amount of
> partitions in your kafka topic.
>
> If you were using Flink api directly you can control de
parallelism
> on each operation (sources and sinks), however when using
beam the
> parallelism is apply to all of the operator in the DAG..
>
> I am using beam 2.9 with Flink 1.5.2 and just today we deployed a
> pipeline (OnPremise) reading from our kafka and publishing to
> bigquery, using hdfs as backing store for checkpoint and is
working
> flawless.
>
> Here is a link for the Flink bug
>
> https://issues.apache.org/jira/browse/FLINK-2491
>
> Hope it helps.
>
> JC
>
> Am Do., 31. Jan. 2019, 15:51 hat Kaymak, Tobias
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> geschrieben:
>
> I should have drank the coffee before writing this ;)
> The end-to-end-duration of snapshots is fine, the
snapshots were
> created at 10:00 in the morning and I thought they took
> increasing more time because of the clock ;)
> The rest of the findings are still valid.
>
> On Thu, Jan 31, 2019 at 12:27 PM Kaymak, Tobias
> <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>> wrote:
>
> Follow up to summarize my findings so far:
>
> Beam 2.9.0 with Flink 1.5.5: No checkpoints are created
> Beam 2.10RC1 with Flink 1.6.2: Metrics shown in the
> webinterface of Flink's jobmanager, Checkpoints are
created
> - but checkpoints take more than 10 minutes even if
> end-to-end duration is 10 seconds.
> Beam 2.10RC1 with Flink 1.7.1: No Metrics shown in the
> webinterface of Flink's jobmanager, Checkpoints are
created
> - but checkpoints take more than 10 minutes even if
> end-to-end duration is 10 seconds
>
> Attached is a screenshot from the 1.7.1 webinterface.
So far
> using 1.6.2 with Beam 2.10RC1 seems to be the best
option. I
> am continuing to investigate why the checkpoints take
so long.
>
> image.png
>
>
> On Wed, Jan 30, 2019 at 1:49 PM Maximilian Michels
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Thank you for verifying this. This is manifested in
> https://jira.apache.org/jira/browse/BEAM-5386 and has
> indeed been fixed already
> for 2.10.0.
>
> This likely warrants a 2.9.1 release. I'll check
on the
> dev mailing list.
>
> Thanks,
> Max
>
> On 30.01.19 10:27, Kaymak, Tobias wrote:
> > Hi Maximilian,
> >
> > I can confirm that checkpoints work with Beam
> 2.10-SNAPSHOT and do not work with
> > version 2.9. I am very sure it is related to
this issue:
> > https://issues.apache.org/jira/browse/FLINK-2491 -
> which has been fixed in 2.10,
> > since parts of the pipeline are FINISHED after a
> couple of minutes and this then
> > triggers the shutdown of the checkpoints. However,
> executing the pipeline on a
> > Flink 1.5.5 cluster yields no metrics about the
> elements processed in the
> > webinterface anymore:
> >
> > 2019-01-30 09:14:53,934 WARN
> org.apache.beam.sdk.metrics.MetricsEnvironment -
> > Reporting metrics are not supported in the current
> execution environment.
> >
> > Is this a known issue? I want to change my Flink
> version to 1.6 to see if this
> > is fixed, but I am unsure at this point how to
> achieve this. Is it something I
> > can pass in my pom.xml?
> >
> >
> >
> >
> > image.png
> >
> > Best,
> > Tobi
> >
> >
> >
> > On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >
> > Hi Tobias,
> >
> > It is normal to see "No restore state for
> UnbounedSourceWrapper" when not
> > restoring from a checkpoint/savepoint.
> >
> > Just checking. You mentioned you set the
> checkpoint interval via:
> > --checkpointingInterval=300000
> >
> > That means you have to wait 5 minutes
until the
> first checkpoint will be taken.
> > You should be seeing an INFO message like
this:
> "INFO: Triggering checkpoint
> > 1 @
> > 1548775459114 for job
> 3b5bdb811f1923bf49db24403e9c1ae9."
> >
> > Thanks,
> > Max
> >
> > On 29.01.19 16:13, Kaymak, Tobias wrote:
> > > Even after altering the pipeline and
making it
> way more simple it still
> > does not
> > > checkpoint. (I used a single KafkaTopic
as a
> source and altered the IO
> > step the
> > > following way:
> > >
> > > .apply(
> > > BigQueryIO.<Event>write()
> > >
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> > >
> .withTriggeringFrequency(refreshFrequency)
> > > .withNumFileShards(1)
> > > .to(projectId + ":" +
dataset
> + "." + tableName)
> > > .withTimePartitioning(new
> > > TimePartitioning().setField("event_date"))
> > > .withSchema(tableSchema)
> > > .withFormatFunction(
> > >
> (SerializableFunction<Event, TableRow>)
> > >
> KafkaToBigQuery::convertUserEventToTableRow)
> > >
> > >
>
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> > >
> > >
>
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> > >
> > > The graph that Flink 1.5.5 generated looked
> exactly the same and
> > checkpointing
> > > did not work still:
> > > image.png
> > >
> > > On Tue, Jan 29, 2019 at 11:05 AM Kaymak,
> Tobias <[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>
> > > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>> wrote:
> > >
> > > If I have a pipeline running and I
restart
> the taskmanager on which it's
> > > executing the log shows - I find
the "No
> restore state for
> > > UnbounedSourceWrapper."
interesting, as it
> seems to indicate that the
> > > pipeline never stored a state in
the first
> place?
> > >
> > > Starting taskexecutor as a console
> application on host
> > > flink-taskmanager-5d85dd6854-pm5bl.
> > > 2019-01-29 09:20:48,706 WARN
> org.apache.hadoop.util.NativeCodeLoader
> > > - Unable to load
> native-hadoop library for your
> > platform...
> > > using builtin-java classes where
applicable
> > > 2019-01-29 09:20:51,253 WARN
> > >
>
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
- SASL
> > > configuration failed:
> javax.security.auth.login.LoginException: No JAAS
> > > configuration section named
'Client' was
> found in specified JAAS
> > > configuration file:
> '/tmp/jaas-7768141350028767113.conf'. Will continue
> > > connection to Zookeeper server without
> SASL authentication, if Zookeeper
> > > server allows it.
> > > 2019-01-29 09:20:51,281 ERROR
> > >
>
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState -
> > > Authentication failed
> > > 2019-01-29 09:21:53,814 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:53,828 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:53,834 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:53,917 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
UnbounedSourceWrapper.
> > > 2019-01-29 09:21:53,929 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:53,937 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:53,978 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is reading
> from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
> > > 2019-01-29 09:21:54,002 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,008 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,011 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,020 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,080 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,091 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,099 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,107 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,109 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,119 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,118 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,115 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,114 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,111 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,111 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,110 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,110 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,109 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,144 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,172 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,176 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,179 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,189 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,191 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,203 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,210 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,217 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,238 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,242 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length limit
> and was truncated.
> > > 2019-01-29 09:21:54,339 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
UnbounedSourceWrapper.
> > > 2019-01-29 09:21:54,371 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
UnbounedSourceWrapper.
> > > 2019-01-29 09:21:54,479 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
UnbounedSourceWrapper.
> > > 2019-01-29 09:21:55,509 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is reading
> from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
> > > 2019-01-29 09:21:55,535 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is reading
> from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
> > > 2019-01-29 09:21:55,770 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is reading
> from sources:
> > > [org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>
>
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@75aab48]
> > > 2019-01-29 09:21:56,280 WARN
> > >
> org.apache.kafka.clients.consumer.ConsumerConfig
> - The
> > > configuration
> 'metis.input.messages.config' was supplied but
isn't a
> > known
> > > config.
> > > 2019-01-29 09:21:57,387 INFO
> > >
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
> - Writing
> > > BigQuery temporary files to
> > >
> >
>
gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
> > > before loading them.
> > > 2019-01-29 09:21:58,118 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting for
> > > jobs to complete.
> > > 2019-01-29 09:21:58,118 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting for
> > > jobs to complete.
> > > 2019-01-29 09:21:58,118 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting for
> > > jobs to complete.
> > > 2019-01-29 09:21:58,140 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-0 starting at
offset
> 13112
> > > 2019-01-29 09:21:58,141 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-1 starting at
offset
> 13407
> > > 2019-01-29 09:21:58,142 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-2 starting at
offset
> 13034
> > > 2019-01-29 09:21:58,142 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-3 starting at
offset
> 13271
> > > 2019-01-29 09:21:58,142 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-4 starting at
offset
> 12813
> > > 2019-01-29 09:21:58,142 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-5 starting at
offset
> 13211
> > > 2019-01-29 09:21:58,144 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-6 starting at
offset
> 13394
> > > 2019-01-29 09:21:58,145 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-7 starting at
offset
> 13194
> > > 2019-01-29 09:21:58,145 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-8 starting at
offset
> 13478
> > > 2019-01-29 09:21:58,145 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> - Reader-0:
> > > reading from ratings-9 starting at
offset
> 12966
> > >
> > >
> > > On Mon, Jan 28, 2019 at 3:36 PM
Kaymak, Tobias
> > <[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>
> > > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>>
> > wrote:
> > >
> > > Hi Maximilian,
> > >
> > > yes, I've set the --runner to
> FlinkRunner when launching the pipeline
> > > and it does work for a GCS
sink, but
> it seems to be ignored for a
> > > BigQuery sink somehow. Even
though it
> looks like the system magically
> > > handles it itself.
> > >
> > > This is the full command line to
> launch the Beam 2.9.0 pipeline
> > on Flink
> > > 1.5.5:
> > >
> > > bin/flink run -d -c
> di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
> > > --runner=FlinkRunner
--appName=ratings
> > --checkpointingMode=EXACTLY_ONCE
> > > --checkpointingInterval=300000
> --parallelism=1
> > > --tempLocation=gs://somebucket
> > >
> > > Here are the logs from the
> taskmanager, I can share the full code
> > of the
> > > pipeline if you want:
> > >
> > > 2019-01-28 14:33:31,287 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:31,911 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:31,976 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,217 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,227 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,228 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,276 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,282 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
> UnbounedSourceWrapper.
> > > 2019-01-28 14:33:32,288 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is
> reading from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
> > > 2019-01-28 14:33:32,296 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
> UnbounedSourceWrapper.
> > > 2019-01-28 14:33:32,318 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,321 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,324 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,329 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,357 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,482 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,483 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,493 WARN
> org.apache.flink.metrics.MetricGroup
> > > �� - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,697 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,782 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:32,789 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,093 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,122 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,162 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
> UnbounedSourceWrapper.
> > > 2019-01-28 14:33:33,179 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,187 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,192 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,218 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,220 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,298 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,304 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,323 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,326 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,357 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,377 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,395 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,477 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,487 WARN
> org.apache.flink.metrics.MetricGroup
> > > - The
operator name
> > >
> >
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> > > exceeded the 80 characters length
> limit and was truncated.
> > > 2019-01-28 14:33:33,748 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - No restore state for
> UnbounedSourceWrapper.
> > > 2019-01-28 14:33:34,577 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is
> reading from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
> > > 2019-01-28 14:33:34,610 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is
> reading from sources:
> > >
> >
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
> > > 2019-01-28 14:33:34,747 INFO
> > >
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>
<http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
> > <http://streaming.io>.UnboundedSourceWrapper
> > > - Unbounded Flink Source 0/1 is
> reading from sources:
> > > [org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>
>
<http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@71389814]
> > > 2019-01-28 14:33:34,896 WARN
> > >
> org.apache.kafka.clients.consumer.ConsumerConfig
> - The
> > > configuration
> 'metis.input.messages.config' was supplied but
isn't a
> > > known config.
> > > 2019-01-28 14:33:35,462 INFO
> > >
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
> -
> > Writing
> > > BigQuery temporary files to
> > >
> >
>
gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
> > > before loading them.
> > > 2019-01-28 14:33:35,544 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-0
> starting at offset 2945
> > > 2019-01-28 14:33:35,544 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-1
> starting at offset 3101
> > > 2019-01-28 14:33:35,544 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-2
> starting at offset 3031
> > > 2019-01-28 14:33:35,545 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-3
> starting at offset 3009
> > > 2019-01-28 14:33:35,545 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-4
> starting at offset 2903
> > > 2019-01-28 14:33:35,545 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-5
> starting at offset 3068
> > > 2019-01-28 14:33:35,545 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-6
> starting at offset 3160
> > > 2019-01-28 14:33:35,545 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-7
> starting at offset 3014
> > > 2019-01-28 14:33:35,546 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-8
> starting at offset 3096
> > > 2019-01-28 14:33:35,546 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: reading from ratings-9
> starting at offset 2885
> > > 2019-01-28 14:33:35,577 WARN
> > >
> org.apache.kafka.clients.consumer.ConsumerConfig
> - The
> > > configuration
> 'metis.input.messages.config' was supplied but
isn't a
> > > known config.
> > > 2019-01-28 14:33:35,801 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting
> > > for jobs to complete.
> > > 2019-01-28 14:33:35,803 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting
> > > for jobs to complete.
> > > 2019-01-28 14:33:35,801 INFO
> > >
>
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
> -
> > Waiting
> > > for jobs to complete.
> > > 2019-01-28 14:33:36,217 INFO
> > >
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
> -
> > > Reader-0: first record offset 3014
> > >
> > >
> > > Best,
> > > Tobi
> > >
> > >
> > > On Mon, Jan 28, 2019 at 11:52 AM
> Maximilian Michels
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>> wrote:
> > >
> > > Hi Tobias,
> > >
> > > Checkpointing should be enabled
> when you set it in the Flink
> > config
> > > or via the
> > > Beam option
> `checkpointingInterval`. Did you set `runner` to
> > > `FlinkRunner`?
> > >
> > > If possible, could you
share parts
> of the Flink logs?
> > >
> > > Thanks,
> > > Max
> > >
> > > On 25.01.19 15:14, Kaymak,
Tobias
> wrote:
> > > > Hi,
> > > >
> > > > I am trying to migrate my
> existing KafkaToGCS pipeline to a
> > > KafkaToBigQuery
> > > > pipeline to skip the loading
> step from GCS which is currently
> > > handled externally
> > > > from Beam.
> > > >
> > > > I noticed that the pipeline,
> written in Beam 2.9.0 (Java) does
> > > not trigger any
> > > > checkpoint on Flink (1.5.5),
> even though its configured to
> > do so
> > > when I launch
> > > > it. Is this normal? How does
> Beam then guarantee exactly once
> > > when there are no
> > > > checkpoints in Flink?
(It seems
> to start from scratch when it
> > > crashes, during my
> > > > tests, but I am not 100%
sure)
> > > >
> > > >
> > > > This is my pipeline:
> > > >
> > > > pipeline
> > > > .apply(
> > > >
KafkaIO.<String,
> String>read()
> > > >
> .withBootstrapServers(bootstrap)
> > > �� >
> .withTopics(topics)
> > > >
> > .withKeyDeserializer(StringDeserializer.class)
> > > >
> > >
>
.withValueDeserializer(ConfigurableDeserializer.class)
> > > >
> .updateConsumerProperties(
> > > >
> > >
>
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> > > > inputMessagesConfig))
> > > >
> > >
>
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> > > > "earliest"))
> > > >
> > >
>
.updateConsumerProperties(ImmutableMap.of("group.id <http://group.id>
> <http://group.id>
> > <http://group.id> <http://group.id>
> > > > <http://group.id>",
groupId))
> > > >
> > >
>
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> > > > "true"))
> > > >
> .withReadCommitted()
> > > >
> .withTimestampPolicyFactory(withEventTs)
> > > >
> .commitOffsetsInFinalize())
> > > > .apply(ParDo.of(new
> ToEventFn()))
> > > > .apply(
> > > > Window.into(new
> ZurichTimePartitioningWindowFn())
> > > >
> > > >
.triggering(
> > > >
> Repeatedly.forever(
> > > >
> AfterFirst.of(
> > > >
> > >
> AfterPane.elementCountAtLeast(bundleSize),
> > > >
> > >
> AfterProcessingTime.pastFirstElementInPane()
> > > >
> > .plusDelayOf(refreshFrequency))))
> > > >
> >
.withAllowedLateness(Duration.standardDays(14))
> > > >
> .discardingFiredPanes())
> > > > .apply(
> > > >
> BigQueryIO.<Event>write()
> > > >
> >
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> > > >
> .withTriggeringFrequency(refreshFrequency)
> > > >
> .withNumFileShards(1)
> > > >
> .to(partitionedTableDynamicDestinations)
> > > >
> .withFormatFunction(
> > > >
> (SerializableFunction<Event, TableRow>)
> > > >
> > KafkaToBigQuery::convertUserEventToTableRow)
> > > >
> > > >
> > >
> >
>
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> > > >
> > > >
> > >
> >
>
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> > > >
> > > >
> pipeline.run().waitUntilFinish();
> > > > It's launched like the other
> (GCS) one via:
> > > >
> > > >
...--checkpointingMode=EXACTLY_ONCE
> > --checkpointingInterval=300000
> > > > --parallelism=1
> --tempLocation=gs://foo..
> > > >
> > > > Any idea why
checkpointing does
> not work here?
> > > >
> > > > Best,
> > > > Tobias
> > >
> > >
> > >
> > > --
> > > Tobias Kaymak
> > > Data Engineer
> > >
> > > [email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>
> > > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch>
> <http://www.ricardo.ch> <http://www.ricardo.ch/>
> > >
> > >
> > >
> > > --
> > > Tobias Kaymak
> > > Data Engineer
> > >
> > > [email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>
> > > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch>
> <http://www.ricardo.ch> <http://www.ricardo.ch/>
> > >
> > >
> > >
> > > --
> > > Tobias Kaymak
> > > Data Engineer
> > >
> > > [email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>
> > > www.ricardo.ch <http://www.ricardo.ch>
<http://www.ricardo.ch>
> <http://www.ricardo.ch> <http://www.ricardo.ch/>
> > >
> >
>