Hi Maximilian,
I can confirm that checkpoints work with Beam 2.10-SNAPSHOT and do not work with
version 2.9. I am very sure it is related to this issue:
https://issues.apache.org/jira/browse/FLINK-2491 - which has been fixed in 2.10,
since parts of the pipeline are FINISHED after a couple of minutes and this then
triggers the shutdown of the checkpoints. However, executing the pipeline on a
Flink 1.5.5 cluster yields no metrics about the elements processed in the
webinterface anymore:
2019-01-30 09:14:53,934 WARN org.apache.beam.sdk.metrics.MetricsEnvironment -
Reporting metrics are not supported in the current execution environment.
Is this a known issue? I want to change my Flink version to 1.6 to see if this
is fixed, but I am unsure at this point how to achieve this. Is it something I
can pass in my pom.xml?
image.png
Best,
Tobi
On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Hi Tobias,
It is normal to see "No restore state for UnbounedSourceWrapper" when not
restoring from a checkpoint/savepoint.
Just checking. You mentioned you set the checkpoint interval via:
--checkpointingInterval=300000
That means you have to wait 5 minutes until the first checkpoint will be
taken.
You should be seeing an INFO message like this: "INFO: Triggering checkpoint
1 @
1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9."
Thanks,
Max
On 29.01.19 16:13, Kaymak, Tobias wrote:
> Even after altering the pipeline and making it way more simple it still
does not
> checkpoint. (I used a single KafkaTopic as a source and altered the IO
step the
> following way:
>
> .apply(
> BigQueryIO.<Event>write()
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(refreshFrequency)
> .withNumFileShards(1)
> .to(projectId + ":" + dataset + "." + tableName)
> .withTimePartitioning(new
> TimePartitioning().setField("event_date"))
> .withSchema(tableSchema)
> .withFormatFunction(
> (SerializableFunction<Event, TableRow>)
> KafkaToBigQuery::convertUserEventToTableRow)
>
>
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
> The graph that Flink 1.5.5 generated looked exactly the same and
checkpointing
> did not work still:
> image.png
>
> On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
wrote:
>
> If I have a pipeline running and I restart the taskmanager on which
it's
> executing the log shows - I find the "No restore state for
> UnbounedSourceWrapper." interesting, as it seems to indicate that the
> pipeline never stored a state in the first place?
>
> Starting taskexecutor as a console application on host
> flink-taskmanager-5d85dd6854-pm5bl.
> 2019-01-29 09:20:48,706 WARN org.apache.hadoop.util.NativeCodeLoader
> - Unable to load native-hadoop library for your
platform...
> using builtin-java classes where applicable
> 2019-01-29 09:20:51,253 WARN
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn -
SASL
> configuration failed: javax.security.auth.login.LoginException: No
JAAS
> configuration section named 'Client' was found in specified JAAS
> configuration file: '/tmp/jaas-7768141350028767113.conf'. Will
continue
> connection to Zookeeper server without SASL authentication, if
Zookeeper
> server allows it.
> 2019-01-29 09:20:51,281 ERROR
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState -
> Authentication failed
> 2019-01-29 09:21:53,814 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:53,828 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:53,834 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:53,917 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-29 09:21:53,929 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:53,937 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:53,978 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
> 2019-01-29 09:21:54,002 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,008 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,011 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,020 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,080 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,091 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,099 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,107 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,119 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,118 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,115 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,114 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,111 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,110 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,109 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,144 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,172 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,176 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,179 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,189 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,191 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,203 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,210 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,217 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,238 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,242 WARN org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-29 09:21:54,339 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-29 09:21:54,371 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-29 09:21:54,479 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-29 09:21:55,509 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
> 2019-01-29 09:21:55,535 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
> 2019-01-29 09:21:55,770 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@75aab48]
> 2019-01-29 09:21:56,280 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The
> configuration 'metis.input.messages.config' was supplied but isn't a
known
> config.
> 2019-01-29 09:21:57,387 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads -
Writing
> BigQuery temporary files to
>
gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
> before loading them.
> 2019-01-29 09:21:58,118 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting for
> jobs to complete.
> 2019-01-29 09:21:58,118 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting for
> jobs to complete.
> 2019-01-29 09:21:58,118 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting for
> jobs to complete.
> 2019-01-29 09:21:58,140 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-0 starting at offset 13112
> 2019-01-29 09:21:58,141 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-1 starting at offset 13407
> 2019-01-29 09:21:58,142 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-2 starting at offset 13034
> 2019-01-29 09:21:58,142 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-3 starting at offset 13271
> 2019-01-29 09:21:58,142 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-4 starting at offset 12813
> 2019-01-29 09:21:58,142 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-5 starting at offset 13211
> 2019-01-29 09:21:58,144 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-6 starting at offset 13394
> 2019-01-29 09:21:58,145 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-7 starting at offset 13194
> 2019-01-29 09:21:58,145 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-8 starting at offset 13478
> 2019-01-29 09:21:58,145 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
Reader-0:
> reading from ratings-9 starting at offset 12966
>
>
> On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
wrote:
>
> Hi Maximilian,
>
> yes, I've set the --runner to FlinkRunner when launching the
pipeline
> and it does work for a GCS sink, but it seems to be ignored for a
> BigQuery sink somehow. Even though it looks like the system
magically
> handles it itself.
>
> This is the full command line to launch the Beam 2.9.0 pipeline
on Flink
> 1.5.5:
>
> bin/flink run -d -c di.beam.KafkaToBigQuery -j
lib/beam_pipelines.jar
> --runner=FlinkRunner --appName=ratings
--checkpointingMode=EXACTLY_ONCE
> --checkpointingInterval=300000 --parallelism=1
> --tempLocation=gs://somebucket
>
> Here are the logs from the taskmanager, I can share the full code
of the
> pipeline if you want:
>
> 2019-01-28 14:33:31,287 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:31,911 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:31,976 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,217 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,227 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,228 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,276 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,282 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:32,288 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
> 2019-01-28 14:33:32,296 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:32,318 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,321 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,324 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,329 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,357 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,482 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,483 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,493 WARN
org.apache.flink.metrics.MetricGroup
> �� - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,697 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,782 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,789 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,093 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,122 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,162 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:33,179 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,187 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,192 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,218 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,220 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,298 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,304 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,323 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,326 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,357 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,377 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,395 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,477 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,487 WARN
org.apache.flink.metrics.MetricGroup
> - The operator name
>
BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:33,748 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:34,577 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
> 2019-01-28 14:33:34,610 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
>
[org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
> 2019-01-28 14:33:34,747 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io
<http://streaming.io>.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@71389814]
> 2019-01-28 14:33:34,896 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig -
The
> configuration 'metis.input.messages.config' was supplied but
isn't a
> known config.
> 2019-01-28 14:33:35,462 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads -
Writing
> BigQuery temporary files to
>
gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
> before loading them.
> 2019-01-28 14:33:35,544 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-0 starting at offset 2945
> 2019-01-28 14:33:35,544 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-1 starting at offset 3101
> 2019-01-28 14:33:35,544 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-2 starting at offset 3031
> 2019-01-28 14:33:35,545 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-3 starting at offset 3009
> 2019-01-28 14:33:35,545 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-4 starting at offset 2903
> 2019-01-28 14:33:35,545 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-5 starting at offset 3068
> 2019-01-28 14:33:35,545 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-6 starting at offset 3160
> 2019-01-28 14:33:35,545 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-7 starting at offset 3014
> 2019-01-28 14:33:35,546 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-8 starting at offset 3096
> 2019-01-28 14:33:35,546 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: reading from ratings-9 starting at offset 2885
> 2019-01-28 14:33:35,577 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig -
The
> configuration 'metis.input.messages.config' was supplied but
isn't a
> known config.
> 2019-01-28 14:33:35,801 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting
> for jobs to complete.
> 2019-01-28 14:33:35,803 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting
> for jobs to complete.
> 2019-01-28 14:33:35,801 INFO
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers -
Waiting
> for jobs to complete.
> 2019-01-28 14:33:36,217 INFO
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource -
> Reader-0: first record offset 3014
>
>
> Best,
> Tobi
>
>
> On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Hi Tobias,
>
> Checkpointing should be enabled when you set it in the Flink
config
> or via the
> Beam option `checkpointingInterval`. Did you set `runner` to
> `FlinkRunner`?
>
> If possible, could you share parts of the Flink logs?
>
> Thanks,
> Max
>
> On 25.01.19 15:14, Kaymak, Tobias wrote:
> > Hi,
> >
> > I am trying to migrate my existing KafkaToGCS pipeline to
a
> KafkaToBigQuery
> > pipeline to skip the loading step from GCS which is
currently
> handled externally
> > from Beam.
> >
> > I noticed that the pipeline, written in Beam 2.9.0 (Java)
does
> not trigger any
> > checkpoint on Flink (1.5.5), even though its configured to
do so
> when I launch
> > it. Is this normal? How does Beam then guarantee exactly
once
> when there are no
> > checkpoints in Flink? (It seems to start from scratch
when it
> crashes, during my
> > tests, but I am not 100% sure)
> >
> >
> > This is my pipeline:
> >
> > pipeline
> > .apply(
> > KafkaIO.<String, String>read()
> > .withBootstrapServers(bootstrap)
> > .withTopics(topics)
> >
.withKeyDeserializer(StringDeserializer.class)
> >
> .withValueDeserializer(ConfigurableDeserializer.class)
> > .updateConsumerProperties(
> >
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> > inputMessagesConfig))
> >
>
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> > "earliest"))
> >
> .updateConsumerProperties(ImmutableMap.of("group.id
<http://group.id> <http://group.id>
> > <http://group.id>", groupId))
> >
>
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> > "true"))
> > .withReadCommitted()
> > .withTimestampPolicyFactory(withEventTs)
> > .commitOffsetsInFinalize())
> > .apply(ParDo.of(new ToEventFn()))
> > .apply(
> > Window.into(new
ZurichTimePartitioningWindowFn())
> >
> > .triggering(
> > Repeatedly.forever(
> > AfterFirst.of(
> >
> AfterPane.elementCountAtLeast(bundleSize),
> >
> AfterProcessingTime.pastFirstElementInPane()
> >
.plusDelayOf(refreshFrequency))))
> >
.withAllowedLateness(Duration.standardDays(14))
> > .discardingFiredPanes())
> > .apply(
> > BigQueryIO.<Event>write()
> >
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >
.withTriggeringFrequency(refreshFrequency)
> > .withNumFileShards(1)
> > .to(partitionedTableDynamicDestinations)
> > .withFormatFunction(
> > (SerializableFunction<Event,
TableRow>)
> >
KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
>
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> >
>
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> > pipeline.run().waitUntilFinish();
> > It's launched like the other (GCS) one via:
> >
> > ...--checkpointingMode=EXACTLY_ONCE
--checkpointingInterval=300000
> > --parallelism=1 --tempLocation=gs://foo..
> >
> > Any idea why checkpointing does not work here?
> >
> > Best,
> > Tobias
>
>
>
> --
> Tobias Kaymak
> Data Engineer
>
> [email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
>
>
>
> --
> Tobias Kaymak
> Data Engineer
>
> [email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
>
>
>
> --
> Tobias Kaymak
> Data Engineer
>
> [email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> www.ricardo.ch <http://www.ricardo.ch> <http://www.ricardo.ch/>
>