Thanks for the update. The Flink 1.7 support is still fresh, so we will keep that in mind.

Do you mind creating a JIRA issue?

Thanks,
Max

On 11.02.19 11:29, Kaymak, Tobias wrote:
Hi,

I did test this with 10 partitions per topic, 1 taskmanager (parallelism=1).

I tested with the 2.10-SNAPSHOT version, and this version was creating checkpoints with Flink 1.6.2 and Flink 1.7.1. However, I didn't see any metrics about how many elements per second were progressed with Flink 1.7.1, but they were shown in Flink 1.6.2.

Best,
Tobias

On Mon, Feb 4, 2019 at 10:56 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Tobias, do I understand correctly that checkpointing works fine with
    2.10.0? The issues you reported are related to Metrics only?

    Juan, it is correct that the bug[1] is caused by a Problem in Flink[2].
    The bug occurred when a task had no work assigned, e.g. an empty
    partition list when parallelism > #partitions.

    The issue is resolved for 2.10.0 and will be fixed in 2.7.1 (LTS).

    [1] https://jira.apache.org/jira/browse/BEAM-5386
    [2] https://issues.apache.org/jira/browse/FLINK-2491

    On 01.02.19 20:21, Juan Carlos Garcia wrote:
     > Sorry i was on the phone, the Flink version is 1.5.4.
     >
     > Am Fr., 1. Feb. 2019, 20:19 hat Juan Carlos Garcia
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>>
    geschrieben:
     >
     >     Hi Tobias
     >
     >     I would like to ask the following and see if this apply to you.
     >
     >     How many kafka partitions you have?
     >     How many Taskmanagers are you using? (parallelism)
     >
     >     There is bug in Flink, which is triggered as soon as you start
     >     playing around with a parallelism greater than the amount of
     >     partitions in your kafka topic.
     >
     >     If you were using Flink api directly you can control de
    parallelism
     >     on each operation (sources and sinks), however when using
    beam the
     >     parallelism is apply to all of the operator in the DAG..
     >
     >     I am using beam 2.9 with Flink 1.5.2 and just today we deployed a
     >     pipeline (OnPremise) reading from our kafka and publishing to
     >     bigquery, using hdfs as backing store for checkpoint and is
    working
     >     flawless.
     >
     >     Here is a link for the Flink bug
     >
     > https://issues.apache.org/jira/browse/FLINK-2491
     >
     >     Hope it helps.
     >
     >     JC
     >
     >     Am Do., 31. Jan. 2019, 15:51 hat Kaymak, Tobias
     >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >     geschrieben:
     >
     >         I should have drank the coffee before writing this ;)
     >         The end-to-end-duration of snapshots is fine, the
    snapshots were
     >         created at 10:00 in the morning and I thought they took
     >         increasing more time because of the clock ;)
     >         The rest of the findings are still valid.
     >
     >         On Thu, Jan 31, 2019 at 12:27 PM Kaymak, Tobias
     >         <[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>> wrote:
     >
     >             Follow up to summarize my findings so far:
     >
     >             Beam 2.9.0 with Flink 1.5.5: No checkpoints are created
     >             Beam 2.10RC1 with Flink 1.6.2: Metrics shown in the
     >             webinterface of Flink's jobmanager, Checkpoints are
    created
     >             - but checkpoints take more than 10 minutes even if
     >             end-to-end duration is 10 seconds.
     >             Beam 2.10RC1 with Flink 1.7.1: No Metrics shown in the
     >             webinterface of Flink's jobmanager, Checkpoints are
    created
     >             - but checkpoints take more than 10 minutes even if
     >             end-to-end duration is 10 seconds
     >
     >             Attached is a screenshot from the 1.7.1 webinterface.
    So far
     >             using 1.6.2 with Beam 2.10RC1 seems to be the best
    option. I
     >             am continuing to investigate why the checkpoints take
    so long.
     >
     >             image.png
     >
     >
     >             On Wed, Jan 30, 2019 at 1:49 PM Maximilian Michels
     >             <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >                 Thank you for verifying this. This is manifested in
     > https://jira.apache.org/jira/browse/BEAM-5386 and has
     >                 indeed been fixed already
     >                 for 2.10.0.
     >
     >                 This likely warrants a 2.9.1 release. I'll check
    on the
     >                 dev mailing list.
     >
     >                 Thanks,
     >                 Max
     >
     >                 On 30.01.19 10:27, Kaymak, Tobias wrote:
     >                  > Hi Maximilian,
     >                  >
     >                  > I can confirm that checkpoints work with Beam
     >                 2.10-SNAPSHOT and do not work with
     >                  > version 2.9. I am very sure it is related to
    this issue:
     >                  > https://issues.apache.org/jira/browse/FLINK-2491 -
     >                 which has been fixed in 2.10,
     >                  > since parts of the pipeline are FINISHED after a
     >                 couple of minutes and this then
     >                  > triggers the shutdown of the checkpoints. However,
     >                 executing the pipeline on a
     >                  > Flink 1.5.5 cluster yields no metrics about the
     >                 elements processed in the
     >                  > webinterface anymore:
     >                  >
     >                  > 2019-01-30 09:14:53,934 WARN
     >                 org.apache.beam.sdk.metrics.MetricsEnvironment -
     >                  > Reporting metrics are not supported in the current
     >                 execution environment.
     >                  >
     >                  > Is this a known issue? I want to change my Flink
     >                 version to 1.6 to see if this
     >                  > is fixed, but I am unsure at this point how to
     >                 achieve this. Is it something I
     >                  > can pass in my pom.xml?
     >                  >
     >                  >
     >                  >
     >                  >
     >                  > image.png
     >                  >
     >                  > Best,
     >                  > Tobi
     >                  >
     >                  >
     >                  >
     >                  > On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels
     >                 <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >                  > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >                  >
     >                  >     Hi Tobias,
     >                  >
     >                  >     It is normal to see "No restore state for
     >                 UnbounedSourceWrapper" when not
     >                  >     restoring from a checkpoint/savepoint.
     >                  >
     >                  >     Just checking. You mentioned you set the
     >                 checkpoint interval via:
     >                  >     --checkpointingInterval=300000
     >                  >
     >                  >     That means you have to wait 5 minutes
    until the
     >                 first checkpoint will be taken.
     >                  >     You should be seeing an INFO message like
    this:
     >                 "INFO: Triggering checkpoint
     >                  >     1 @
     >                  >     1548775459114 for job
     >                 3b5bdb811f1923bf49db24403e9c1ae9."
     >                  >
     >                  >     Thanks,
     >                  >     Max
     >                  >
     >                  >     On 29.01.19 16:13, Kaymak, Tobias wrote:
     >                  >      > Even after altering the pipeline and
    making it
     >                 way more simple it still
     >                  >     does not
     >                  >      > checkpoint. (I used a single KafkaTopic
    as a
     >                 source and altered the IO
     >                  >     step the
     >                  >      > following way:
     >                  >      >
     >                  >      >       .apply(
     >                  >      >              BigQueryIO.<Event>write()
     >                  >      >
     >                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
     >                  >      >
     >                 .withTriggeringFrequency(refreshFrequency)
     >                  >      >                  .withNumFileShards(1)
     >                  >      >                  .to(projectId + ":" +
    dataset
     >                 + "." + tableName)
     >                  >      >                  .withTimePartitioning(new
     >                  >      > TimePartitioning().setField("event_date"))
     >                  >      >                  .withSchema(tableSchema)
     >                  >      >                  .withFormatFunction(
     >                  >      >
     >                 (SerializableFunction<Event, TableRow>)
     >                  >      >
     >                 KafkaToBigQuery::convertUserEventToTableRow)
     >                  >      >
     >                  >      >
>  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
     >                  >      >
     >                  >      >
>  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
     >                  >      >
     >                  >      > The graph that Flink 1.5.5 generated looked
     >                 exactly the same and
     >                  >     checkpointing
     >                  >      > did not work still:
     >                  >      > image.png
     >                  >      >
     >                  >      > On Tue, Jan 29, 2019 at 11:05 AM Kaymak,
     >                 Tobias <[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                  >     <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>
     >                  >      > <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>> wrote:
     >                  >      >
     >                  >      >     If I have a pipeline running and I
    restart
     >                 the taskmanager on which it's
     >                  >      >     executing the log shows - I find
    the "No
     >                 restore state for
     >                  >      >     UnbounedSourceWrapper."
    interesting, as it
     >                 seems to indicate that the
     >                  >      >     pipeline never stored a state in
    the first
     >                 place?
     >                  >      >
     >                  >      >     Starting taskexecutor as a console
     >                 application on host
     >                  >      >     flink-taskmanager-5d85dd6854-pm5bl.
     >                  >      >     2019-01-29 09:20:48,706 WARN
     >                 org.apache.hadoop.util.NativeCodeLoader
     >                  >      >                     - Unable to load
     >                 native-hadoop library for your
     >                  >     platform...
     >                  >      >     using builtin-java classes where
    applicable
     >                  >      >     2019-01-29 09:20:51,253 WARN
     >                  >      >
>  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL
     >                  >      >     configuration failed:
     >                 javax.security.auth.login.LoginException: No JAAS
     >                  >      >     configuration section named
    'Client' was
     >                 found in specified JAAS
     >                  >      >     configuration file:
     >                 '/tmp/jaas-7768141350028767113.conf'. Will continue
     >                  >      >     connection to Zookeeper server without
     >                 SASL authentication, if Zookeeper
     >                  >      >     server allows it.
     >                  >      >     2019-01-29 09:20:51,281 ERROR
     >                  >      >
>  org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
     >                  >      >     Authentication failed
     >                  >      >     2019-01-29 09:21:53,814 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:53,828 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:53,834 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:53,917 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - No restore state for
    UnbounedSourceWrapper.
     >                  >      >     2019-01-29 09:21:53,929 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:53,937 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:53,978 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - Unbounded Flink Source 0/1 is reading
     >                 from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@69908217]
     >                  >      >     2019-01-29 09:21:54,002 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,008 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,011 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,020 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,080 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,091 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,099 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,107 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,109 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,119 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,118 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,115 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,114 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,111 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,111 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,110 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,110 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,109 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,144 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,172 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,176 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,179 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,189 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,191 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,203 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,210 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,217 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,238 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,242 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                      - The operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >                  >      >     exceeded the 80 characters length limit
     >                 and was truncated.
     >                  >      >     2019-01-29 09:21:54,339 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - No restore state for
    UnbounedSourceWrapper.
     >                  >      >     2019-01-29 09:21:54,371 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - No restore state for
    UnbounedSourceWrapper.
     >                  >      >     2019-01-29 09:21:54,479 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - No restore state for
    UnbounedSourceWrapper.
     >                  >      >     2019-01-29 09:21:55,509 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - Unbounded Flink Source 0/1 is reading
     >                 from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2a501a64]
     >                  >      >     2019-01-29 09:21:55,535 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - Unbounded Flink Source 0/1 is reading
     >                 from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@60345813]
     >                  >      >     2019-01-29 09:21:55,770 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >     - Unbounded Flink Source 0/1 is reading
     >                 from sources:
     >                  >      >     [org.apache.beam.sdk.io
    <http://org.apache.beam.sdk.io>
>  <http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@75aab48]
     >                  >      >     2019-01-29 09:21:56,280 WARN
     >                  >      >
     >                   org.apache.kafka.clients.consumer.ConsumerConfig
     >                          - The
     >                  >      >     configuration
     >                 'metis.input.messages.config' was supplied but
    isn't a
     >                  >     known
     >                  >      >     config.
     >                  >      >     2019-01-29 09:21:57,387 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
     >                          - Writing
     >                  >      >     BigQuery temporary files to
     >                  >      >
     >                  >
>  gs://bucket/BigQueryWriteTemp/beam_load_ratingsflink01290921554a632112_85202e449b2d45729d384e3ac5f8cc2b/
     >                  >      >     before loading them.
     >                  >      >     2019-01-29 09:21:58,118 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting for
     >                  >      >     jobs to complete.
     >                  >      >     2019-01-29 09:21:58,118 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting for
     >                  >      >     jobs to complete.
     >                  >      >     2019-01-29 09:21:58,118 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting for
     >                  >      >     jobs to complete.
     >                  >      >     2019-01-29 09:21:58,140 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-0 starting at
    offset
     >                 13112
     >                  >      >     2019-01-29 09:21:58,141 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-1 starting at
    offset
     >                 13407
     >                  >      >     2019-01-29 09:21:58,142 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-2 starting at
    offset
     >                 13034
     >                  >      >     2019-01-29 09:21:58,142 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-3 starting at
    offset
     >                 13271
     >                  >      >     2019-01-29 09:21:58,142 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-4 starting at
    offset
     >                 12813
     >                  >      >     2019-01-29 09:21:58,142 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-5 starting at
    offset
     >                 13211
     >                  >      >     2019-01-29 09:21:58,144 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-6 starting at
    offset
     >                 13394
     >                  >      >     2019-01-29 09:21:58,145 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-7 starting at
    offset
     >                 13194
     >                  >      >     2019-01-29 09:21:58,145 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-8 starting at
    offset
     >                 13478
     >                  >      >     2019-01-29 09:21:58,145 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         - Reader-0:
     >                  >      >     reading from ratings-9 starting at
    offset
     >                 12966
     >                  >      >
     >                  >      >
     >                  >      >     On Mon, Jan 28, 2019 at 3:36 PM
    Kaymak, Tobias
     >                  >     <[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>
     >                  >      >     <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>>
     >                  >     wrote:
     >                  >      >
     >                  >      >         Hi Maximilian,
     >                  >      >
     >                  >      >         yes, I've set the --runner to
     >                 FlinkRunner when launching the pipeline
     >                  >      >         and it does work for a GCS
    sink, but
     >                 it seems to be ignored for a
     >                  >      >         BigQuery sink somehow. Even
    though it
     >                 looks like the system magically
     >                  >      >         handles it itself.
     >                  >      >
     >                  >      >         This is the full command line to
     >                 launch the Beam 2.9.0 pipeline
     >                  >     on Flink
     >                  >      >         1.5.5:
     >                  >      >
     >                  >      >         bin/flink run -d -c
     >                 di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
     >                  >      >         --runner=FlinkRunner
    --appName=ratings
     >                  >     --checkpointingMode=EXACTLY_ONCE
     >                  >      >         --checkpointingInterval=300000
     >                 --parallelism=1
     >                  >      >         --tempLocation=gs://somebucket
     >                  >      >
     >                  >      >         Here are the logs from the
     >                 taskmanager, I can share the full code
     >                  >     of the
     >                  >      >         pipeline if you want:
     >                  >      >
     >                  >      >         2019-01-28 14:33:31,287 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:31,911 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:31,976 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,217 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,227 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,228 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,276 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,282 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - No restore state for
     >                 UnbounedSourceWrapper.
     >                  >      >         2019-01-28 14:33:32,288 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - Unbounded Flink Source 0/1 is
     >                 reading from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95]
     >                  >      >         2019-01-28 14:33:32,296 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - No restore state for
     >                 UnbounedSourceWrapper.
     >                  >      >         2019-01-28 14:33:32,318 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,321 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,324 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,329 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,357 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,482 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,483 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,493 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                          ��   - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,697 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/WriteGroupedRecords/ParMultiDo(WriteGroupedRecordsToFiles)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,782 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:32,789 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,093 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,122 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,162 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - No restore state for
     >                 UnbounedSourceWrapper.
     >                  >      >         2019-01-28 14:33:33,179 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,187 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,192 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ExpandIterable/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,218 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Values/Values/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,220 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,298 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Window.Into()/Window.Assign.out
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,304 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,323 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,326 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,357 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,377 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/Window.Into()/Window.Assign.out
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,395 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,477 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,487 WARN
     >                 org.apache.flink.metrics.MetricGroup
     >                  >      >                              - The
    operator name
     >                  >      >
     >                  >
>  BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/Window.Into()/Window.Assign.out
     >                  >      >         exceeded the 80 characters length
     >                 limit and was truncated.
     >                  >      >         2019-01-28 14:33:33,748 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - No restore state for
     >                 UnbounedSourceWrapper.
     >                  >      >         2019-01-28 14:33:34,577 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - Unbounded Flink Source 0/1 is
     >                 reading from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@1892a35d]
     >                  >      >         2019-01-28 14:33:34,610 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - Unbounded Flink Source 0/1 is
     >                 reading from sources:
     >                  >      >
     >                  >
>  [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@7adcb05b]
     >                  >      >         2019-01-28 14:33:34,747 INFO
     >                  >      >
     > org.apache.beam.runners.flink.translation.wrappers.streaming.io
    <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
>  <http://org.apache.beam.runners.flink.translation.wrappers.streaming.io>
     >                  >     <http://streaming.io>.UnboundedSourceWrapper
     >                  >      >         - Unbounded Flink Source 0/1 is
     >                 reading from sources:
     >                  >      >         [org.apache.beam.sdk.io
    <http://org.apache.beam.sdk.io>
>  <http://org.apache.beam.sdk.io>.kafka.KafkaUnboundedSource@71389814]
     >                  >      >         2019-01-28 14:33:34,896 WARN
     >                  >      >
     >                   org.apache.kafka.clients.consumer.ConsumerConfig
     >                          - The
     >                  >      >         configuration
     >                 'metis.input.messages.config' was supplied but
    isn't a
     >                  >      >         known config.
     >                  >      >         2019-01-28 14:33:35,462 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.gcp.bigquery.BatchLoads
     >                          -
     >                  >     Writing
     >                  >      >         BigQuery temporary files to
     >                  >      >
     >                  >
>  gs://tempspace_eu_regional/dataflow_dev/BigQueryWriteTemp/beam_load_ratingsflink01281433342cd8f9b4_7ce229196b1d41e3ad63d0ef6234e0f6/
     >                  >      >         before loading them.
     >                  >      >         2019-01-28 14:33:35,544 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-0
     >                 starting at offset 2945
     >                  >      >         2019-01-28 14:33:35,544 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-1
     >                 starting at offset 3101
     >                  >      >         2019-01-28 14:33:35,544 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-2
     >                 starting at offset 3031
     >                  >      >         2019-01-28 14:33:35,545 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-3
     >                 starting at offset 3009
     >                  >      >         2019-01-28 14:33:35,545 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-4
     >                 starting at offset 2903
     >                  >      >         2019-01-28 14:33:35,545 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-5
     >                 starting at offset 3068
     >                  >      >         2019-01-28 14:33:35,545 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-6
     >                 starting at offset 3160
     >                  >      >         2019-01-28 14:33:35,545 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-7
     >                 starting at offset 3014
     >                  >      >         2019-01-28 14:33:35,546 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-8
     >                 starting at offset 3096
     >                  >      >         2019-01-28 14:33:35,546 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: reading from ratings-9
     >                 starting at offset 2885
     >                  >      >         2019-01-28 14:33:35,577 WARN
     >                  >      >
     >                   org.apache.kafka.clients.consumer.ConsumerConfig
     >                          - The
     >                  >      >         configuration
     >                 'metis.input.messages.config' was supplied but
    isn't a
     >                  >      >         known config.
     >                  >      >         2019-01-28 14:33:35,801 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting
     >                  >      >         for jobs to complete.
     >                  >      >         2019-01-28 14:33:35,803 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting
     >                  >      >         for jobs to complete.
     >                  >      >         2019-01-28 14:33:35,801 INFO
     >                  >      >
>  org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
     >                         -
     >                  >     Waiting
     >                  >      >         for jobs to complete.
     >                  >      >         2019-01-28 14:33:36,217 INFO
     >                  >      >
     >                   org.apache.beam.sdk.io.kafka.KafkaUnboundedSource
     >                         -
     >                  >      >         Reader-0: first record offset 3014
     >                  >      >
     >                  >      >
     >                  >      >         Best,
     >                  >      >         Tobi
     >                  >      >
     >                  >      >
     >                  >      >         On Mon, Jan 28, 2019 at 11:52 AM
     >                 Maximilian Michels
     >                  >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >                 <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >                  >      >         <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>> wrote:
     >                  >      >
     >                  >      >             Hi Tobias,
     >                  >      >
     >                  >      >             Checkpointing should be enabled
     >                 when you set it in the Flink
     >                  >     config
     >                  >      >             or via the
     >                  >      >             Beam option
     >                 `checkpointingInterval`. Did you set `runner` to
     >                  >      >             `FlinkRunner`?
     >                  >      >
     >                  >      >             If possible, could you
    share parts
     >                 of the Flink logs?
     >                  >      >
     >                  >      >             Thanks,
     >                  >      >             Max
     >                  >      >
     >                  >      >             On 25.01.19 15:14, Kaymak,
    Tobias
     >                 wrote:
     >                  >      >              > Hi,
     >                  >      >              >
     >                  >      >              > I am trying to migrate my
     >                 existing KafkaToGCS pipeline to a
     >                  >      >             KafkaToBigQuery
     >                  >      >              > pipeline to skip the loading
     >                 step from GCS which is currently
     >                  >      >             handled externally
     >                  >      >              > from Beam.
     >                  >      >              >
     >                  >      >              > I noticed that the pipeline,
     >                 written in Beam 2.9.0 (Java) does
     >                  >      >             not trigger any
     >                  >      >              > checkpoint on Flink (1.5.5),
     >                 even though its configured to
     >                  >     do so
     >                  >      >             when I launch
     >                  >      >              > it. Is this normal? How does
     >                 Beam then guarantee exactly once
     >                  >      >             when there are no
     >                  >      >              > checkpoints in Flink?
    (It seems
     >                 to start from scratch when it
     >                  >      >             crashes, during my
     >                  >      >              > tests, but I am not 100%
    sure)
     >                  >      >              >
     >                  >      >              >
     >                  >      >              > This is my pipeline:
     >                  >      >              >
     >                  >      >              >   pipeline
     >                  >      >              >          .apply(
>                  >      >              > KafkaIO.<String,
     >                 String>read()
     >                  >      >              >
     >                 .withBootstrapServers(bootstrap)
     >                  >      >      ��       >
     >                 .withTopics(topics)
     >                  >      >              >
     >                  >     .withKeyDeserializer(StringDeserializer.class)
     >                  >      >              >
     >                  >      >
>  .withValueDeserializer(ConfigurableDeserializer.class)
     >                  >      >              >
     >                 .updateConsumerProperties(
     >                  >      >              >
     >                  >      >
>  ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
     >                  >      >              > inputMessagesConfig))
     >                  >      >              >
     >                  >      >
>  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
     >                  >      >              > "earliest"))
     >                  >      >              >
     >                  >      >
>  .updateConsumerProperties(ImmutableMap.of("group.id <http://group.id>
     >                 <http://group.id>
     >                  >     <http://group.id> <http://group.id>
     >                  >      >              > <http://group.id>",
    groupId))
     >                  >      >              >
     >                  >      >
>  .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
     >                  >      >              > "true"))
     >                  >      >              >
     >                 .withReadCommitted()
     >                  >      >              >
     >                 .withTimestampPolicyFactory(withEventTs)
     >                  >      >              >
     >                 .commitOffsetsInFinalize())
     >                  >      >              >          .apply(ParDo.of(new
     >                 ToEventFn()))
     >                  >      >              >          .apply(
     >                  >      >              >              Window.into(new
     >                 ZurichTimePartitioningWindowFn())
     >                  >      >              >
>                  >      >              > .triggering(
     >                  >      >              >
     >                 Repeatedly.forever(
     >                  >      >              >
     >                 AfterFirst.of(
     >                  >      >              >
     >                  >      >
     >                   AfterPane.elementCountAtLeast(bundleSize),
     >                  >      >              >
     >                  >      >
     >                   AfterProcessingTime.pastFirstElementInPane()
     >                  >      >              >
     >                  >     .plusDelayOf(refreshFrequency))))
     >                  >      >              >
>                  >  .withAllowedLateness(Duration.standardDays(14))
     >                  >      >              >
     >                 .discardingFiredPanes())
     >                  >      >              >          .apply(
     >                  >      >              >
     >                 BigQueryIO.<Event>write()
     >                  >      >              >
>                  >  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
     >                  >      >              >
     >                 .withTriggeringFrequency(refreshFrequency)
     >                  >      >              >
     >                 .withNumFileShards(1)
     >                  >      >              >
     >                 .to(partitionedTableDynamicDestinations)
     >                  >      >              >
     >                 .withFormatFunction(
     >                  >      >              >
     >                 (SerializableFunction<Event, TableRow>)
     >                  >      >              >
     >                  >     KafkaToBigQuery::convertUserEventToTableRow)
     >                  >      >              >
     >                  >      >              >
     >                  >      >
     >                  >
>  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
     >                  >      >              >
     >                  >      >              >
     >                  >      >
     >                  >
>  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
     >                  >      >              >
     >                  >      >              >
     >                 pipeline.run().waitUntilFinish();
     >                  >      >              > It's launched like the other
     >                 (GCS) one via:
     >                  >      >              >
     >                  >      >              >
    ...--checkpointingMode=EXACTLY_ONCE
     >                  >     --checkpointingInterval=300000
     >                  >      >              > --parallelism=1
     >                 --tempLocation=gs://foo..
     >                  >      >              >
     >                  >      >              > Any idea why
    checkpointing does
     >                 not work here?
     >                  >      >              >
     >                  >      >              > Best,
     >                  >      >              > Tobias
     >                  >      >
     >                  >      >
     >                  >      >
     >                  >      >         --
     >                  >      >         Tobias Kaymak
     >                  >      >         Data Engineer
     >                  >      >
     >                  >      > [email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>
     >                  >     <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>
     >                  >      > www.ricardo.ch <http://www.ricardo.ch>
    <http://www.ricardo.ch>
     >                 <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >                  >      >
     >                  >      >
     >                  >      >
     >                  >      >     --
     >                  >      >     Tobias Kaymak
     >                  >      >     Data Engineer
     >                  >      >
     >                  >      > [email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>
     >                  >     <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>
     >                  >      > www.ricardo.ch <http://www.ricardo.ch>
    <http://www.ricardo.ch>
     >                 <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >                  >      >
     >                  >      >
     >                  >      >
     >                  >      > --
     >                  >      > Tobias Kaymak
     >                  >      > Data Engineer
     >                  >      >
     >                  >      > [email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>
     >                  >     <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>
     >                 <mailto:[email protected]
    <mailto:[email protected]>
     >                 <mailto:[email protected]
    <mailto:[email protected]>>>>
     >                  >      > www.ricardo.ch <http://www.ricardo.ch>
    <http://www.ricardo.ch>
     >                 <http://www.ricardo.ch> <http://www.ricardo.ch/>
     >                  >      >
     >                  >
     >

Reply via email to